Темы и логика упругости рабочего процесса
Я хотел бы получить лучшее понимание ожидаемого поведения субъектов при использовании с операторами устойчивости, а именно: retry и retryWhen.
Следующие примеры кода будут немного отличаться от их аналогов в JSBin (см. Примеры ссылок) тем, что я использовал функции и типы стрелок для более удобного использования, это использует версию 4.0.0 - 4.0.7
Мое ожидаемое поведение устойчивости может быть выражено на следующем примере:
Rx.Observable
.interval(1000)
.flatMap( (count:number) => {
return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
})
.retry()
.take(5);
Output
// 0
// 1
// 2
// 3
// 0 <-- Retry means we start again from scratch (expected)
До этого момента все было согласованно, то есть после того, как в четвертом уведомлении произошла ошибка, весь поток перезапускается с нуля (выигрыш для архитектуры без сохранения состояния).
Теперь мы можем почесать голову, если мы добавим оператор многоадресной рассылки и при этом добавим базовый субъект (в моем случае это ReplaySubject с буфером 1), например:
const consumer : Rx.Observable<number> = Rx.Observable
.interval(1000)
.flatMap( (count:number) => {
return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
})
.shareReplay(1) /* multicast(new Rx.ReplaySubject(1)).refCount() */
.retry()
.take(5);
const firstSubscriber : Rx.Disposable = consumer.subscribe( (next:number) => {
console.log('first subscriber: ' + next);
});
setTimeout(() => {
firstSubscriber.dispose(); /* Lets start fresh in that refCount === 0 */
const secondSubscriber : Rx.Disposable = consumer.subscribe( (next) => {
console.log('second subscriber: ' + next);
});
}, 5000 );
Output (before error is thrown)
// "first subscriber: 0"
// "first subscriber: 1"
// "first subscriber: 2"
// "first subscriber: 3"
Output (after error is thrown)
// "first subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
Быстрый просмотр субъекта определяет, когда возникает ошибка, субъект помечается как inError, и каждый будущий подписчик получает последнее уведомление (строка 46) и сразу после вызова onError (строка 50).
Так, где это оставляет нас? По моему мнению, я не верю, что вы когда-либо сможете использовать оператор устойчивости, если он следует за любым другим оператором, который содержит тему (shareReplay, publish и т. Д.).
На данный момент я думаю, что единственный способ добиться успеха в этом проекте - это обеспечить, когда произошла ошибка и был удален узел, всякий раз, когда использовался объект, необходимо было создать новый объект (и вниз по кроличьей норе мы начинаем идти)?
Многоадресная рассылка может занять фабрику /subjectSelector:
.multicast( () => new Rx.ReplaySubject(1), (source:Rx.ConnectableObservable) => source );
Если посмотреть на источник, если вы используете subjectSelector вместо того, чтобы просто передавать тему непосредственно для каждой новой подписки, будет вызван subjectSelector и будет создан новый ConnectableObservable (строка 11).
На данный момент я не уверен, что совместное использование (через некоторый кеш) и удаление (при обнаружении ошибки) субъектов фактически даст многоадресную рассылку подписчикам?
Приступая к этому вопросу, я также написал RecoverableReplaySubject, в котором я удалил состояние ошибки при утилизации, это было больше для тестирования и ожидалось, что команда RxJS включит этот рабочий процесс по уважительной причине.
Любое руководство и опыт по этой теме будет принята с благодарностью.
Спасибо
1 ответ
shareReplay
предметы имеют различную семантику, когда дело доходит до ошибок и завершений. Например, даже если базовая наблюдаемая завершена (refCount == 0
), shareReplay
не будет завершено, так что дальнейший вызов к нему произведет (воспроизведет) прошлые значения. Ср jsbin(shareReplay) против jsbin(share).
var source = Rx.Observable
.interval(100)
.take(5)
.shareReplay()
var first = source.subscribe( function(next) {
console.log('first subscriber: ' + next);
});
setTimeout(function() {
// first.dispose();
var second = source.subscribe( function(next) {
console.log('second subscriber: ' + next);
});
}, 1000 );
В противном случае вы найдете там объяснения о поведении shareReplay
(с обсуждением вашей проблемы) против других операторов:
- https://github.com/ReactiveX/rxjs/issues/1110
- https://github.com/ReactiveX/rxjs/issues/453 (это долгое обсуждение, начните с
jhusain commented on Nov 4, 2015
)
Предложенное решение состояло в том, чтобы точно использовать фабричную функцию для оператора многоадресной рассылки. В любом случае, не должно быть слишком сложно попробовать ваш новый дизайн и посмотреть, работает ли он.