Темы и логика упругости рабочего процесса

Я хотел бы получить лучшее понимание ожидаемого поведения субъектов при использовании с операторами устойчивости, а именно: retry и retryWhen.

Следующие примеры кода будут немного отличаться от их аналогов в JSBin (см. Примеры ссылок) тем, что я использовал функции и типы стрелок для более удобного использования, это использует версию 4.0.0 - 4.0.7

Мое ожидаемое поведение устойчивости может быть выражено на следующем примере:

Rx.Observable
  .interval(1000)
  .flatMap( (count:number) => { 
    return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
  })
  .retry()
  .take(5);

 Output 
 // 0
 // 1
 // 2
 // 3 
 // 0 <-- Retry means we start again from scratch (expected)

До этого момента все было согласованно, то есть после того, как в четвертом уведомлении произошла ошибка, весь поток перезапускается с нуля (выигрыш для архитектуры без сохранения состояния).

Теперь мы можем почесать голову, если мы добавим оператор многоадресной рассылки и при этом добавим базовый субъект (в моем случае это ReplaySubject с буфером 1), например:

const consumer : Rx.Observable<number> = Rx.Observable
  .interval(1000)
  .flatMap( (count:number) => { 
    return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
  })
  .shareReplay(1) /* multicast(new Rx.ReplaySubject(1)).refCount() */
  .retry()
  .take(5);

const firstSubscriber : Rx.Disposable = consumer.subscribe( (next:number) => {
   console.log('first subscriber: ' + next);
});

setTimeout(() => {
   firstSubscriber.dispose(); /* Lets start fresh in that refCount === 0 */
   const secondSubscriber : Rx.Disposable = consumer.subscribe( (next) => {
      console.log('second subscriber: ' + next);
   });
}, 5000 );

Output (before error is thrown)
// "first subscriber: 0"
// "first subscriber: 1"
// "first subscriber: 2"
// "first subscriber: 3"
Output (after error is thrown)
// "first subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3" 

Быстрый просмотр субъекта определяет, когда возникает ошибка, субъект помечается как inError, и каждый будущий подписчик получает последнее уведомление (строка 46) и сразу после вызова onError (строка 50).

Так, где это оставляет нас? По моему мнению, я не верю, что вы когда-либо сможете использовать оператор устойчивости, если он следует за любым другим оператором, который содержит тему (shareReplay, publish и т. Д.).

На данный момент я думаю, что единственный способ добиться успеха в этом проекте - это обеспечить, когда произошла ошибка и был удален узел, всякий раз, когда использовался объект, необходимо было создать новый объект (и вниз по кроличьей норе мы начинаем идти)?

Многоадресная рассылка может занять фабрику /subjectSelector:

.multicast( () => new Rx.ReplaySubject(1), (source:Rx.ConnectableObservable) => source );

Если посмотреть на источник, если вы используете subjectSelector вместо того, чтобы просто передавать тему непосредственно для каждой новой подписки, будет вызван subjectSelector и будет создан новый ConnectableObservable (строка 11).

На данный момент я не уверен, что совместное использование (через некоторый кеш) и удаление (при обнаружении ошибки) субъектов фактически даст многоадресную рассылку подписчикам?

Приступая к этому вопросу, я также написал RecoverableReplaySubject, в котором я удалил состояние ошибки при утилизации, это было больше для тестирования и ожидалось, что команда RxJS включит этот рабочий процесс по уважительной причине.

Любое руководство и опыт по этой теме будет принята с благодарностью.

Спасибо

1 ответ

shareReplay предметы имеют различную семантику, когда дело доходит до ошибок и завершений. Например, даже если базовая наблюдаемая завершена (refCount == 0), shareReplay не будет завершено, так что дальнейший вызов к нему произведет (воспроизведет) прошлые значения. Ср jsbin(shareReplay) против jsbin(share).

var source = Rx.Observable
      .interval(100)
      .take(5)
      .shareReplay()

var first = source.subscribe( function(next) {
  console.log('first subscriber: ' + next);
});

setTimeout(function() {
//  first.dispose();
  var second = source.subscribe( function(next) {
  console.log('second subscriber: ' + next);
});

}, 1000 );

В противном случае вы найдете там объяснения о поведении shareReplay (с обсуждением вашей проблемы) против других операторов:

Предложенное решение состояло в том, чтобы точно использовать фабричную функцию для оператора многоадресной рассылки. В любом случае, не должно быть слишком сложно попробовать ваш новый дизайн и посмотреть, работает ли он.

Другие вопросы по тегам