RxJs Interval с takeUntil для публикации последнего значения

У меня есть код, который опрашивает, пока задача не будет завершена

Увидеть ниже

this.simulationStatus =
  interval(2000).pipe(
    switchMap(
      () => from(this.simulationService.getSimulationStatus(this.route.snapshot.paramMap.get('jobId')))),
    takeUntil(this.stopPoll),
    tap(simulation => {
      if (simulation && simulation.complete) {
        if (this.stopCount == 1) {
          // Get once after complete
          this.stopPoll.next(true);
        }
        this.stopCount++;
      }
    })
  );

Я пытался использовать takeUntil и takeWhile, проблема в том, что последнее значение никогда не публикуется после завершения задачи.

Чтобы обойти это, я должен включить метод tap для субъекта stopPoll и увеличить значение stopCount, чтобы получить последнее значение.

Таким образом, вышесказанное работает, но кажется, что это немного грязно, я уверен, что должен быть лучший способ добиться этого?

Я ожидал, что takeUntil опубликует последнее значение или переопределит его, например, takeUntil(observable, {publishLast: true})

КСТАТИ, Обновление, наблюдаемый подписан на шаблон Angular 6 Заранее спасибо

3 ответа

Решение

Одна вещь, которую вы можете сделать, это использовать пользовательский оператор типа takeWhile, например:

const completeWith = <T>(predicate: (arg: T) => boolean) => (
  source: Observable<T>,
) =>
  new Observable<T>(observer =>
    source.subscribe(
      value => {
        observer.next(value);
        if (predicate(value)) {
          observer.complete();
        }
      },
      error => observer.error(error),
      () => observer.complete(),
    ),
  );

Не кажется хорошей идеей рассматривать его как вариант takeWhite, потому что он не просто принимает значения, пока выполняется условие, но также выдает дополнительное значение.

Может оказаться, что более элегантным решением было бы сделать наблюдаемый статус симуляции излучающим два вида значений: следующие уведомления и уведомления о завершении, аналогично тому, как работают операторы материализации / дематериализации.

Тем временем это было реализовано в rxjs как takeWhile(condition, ?inclusive):

      timer(0, 10).pipe(
    takeWhile((x) => x < 3, true)
)

излучает 0, 1, 2, 3

Вы также можете создать тему и создать с помощью next(), если хотите завершить наблюдаемое.

this.stopPoll: Subject<any> = new Subject<any>();

Если вы хотите сделать завершить подписку. Вы можете вызвать this.stopPoll.next(true);

Вы можете получить доступ к данным в подписке ()

this.simulationStatus.subscribe(success=>{}, failure=>{}, complete=>{});
Другие вопросы по тегам