Наблюдаемые операторы "что-то делают" через некоторое время

В цепочке наблюдаемых rxjs, как я могу сделать что-то с доступом к текущему значению наблюдаемой по истечении заданного количества времени? По сути, я ищу что-то вроде оператора tap, но он выполняется только в том случае, если прошло определенное количество времени, не видя значения из наблюдаемой. Так что на самом деле это как комбинация касания и тайм-аута.

Я представляю что-то вроде следующего

observable$.pipe(
  first(x => x > 5),
  tapAfterTime(2000, x => console.log(x)),
  map(x => x + 1)
).subscribe(...);

Это вымышленный пример, а функция tapAfterTime нереальна. Но основная идея состоит в том, что если после подписки прошло 2000 мс, а наблюдаемое не увидело значение больше 5, то сделайте функцию обратного вызова tapAfterTime для любого текущего значения наблюдаемого. Если бы мы увидели значение больше 5 до 2000 мс, обратный вызов tapAfterTime никогда не запустился бы, но функция карты всегда работала бы так, как ожидалось.

Есть ли оператор для достижения этой или любой комбинации операторов?

3 ответа

Может быть, это действительно слишком сложно, может быть, это стоит посмотреть.

Идея состоит в том, чтобы иметь 2 разных наблюдаемых, созданных трансформируя источник observable$ а потом в итоге слились.

Первая наблюдаемая, давайте назовем это obsFilterAndMappedэто тот, где вы делаете свою фильтрацию и отображение.

Вторая наблюдаемая, давайте назовем это obsTapDelay, является наблюдаемой, которая запускает новый таймер с определенной задержкой в любое время, когда появляется первая наблюдаемая, т.е. obsFilterAndMapped, испускает - как только задержка времени пройдена, вы выполняете tapAfterTime действие - если первый Observable испускает новое значение перед передачей delayTime, то создается новый таймер.

Это код, который реализует эту идею

const stop = new Subject<any>();
const obsShared = observable$.pipe(
    finalize(() => {
        console.log('STOP');
        stop.next();
        stop.complete()
    }),
    share()
);
const delayTime = 300;
const tapAfterTime = (value) => {
    console.log('tap with delay', value)
}; 

let valueEmitted;

const obsFilterAndMapped = obsShared.pipe(
    tap(val => valueEmitted = val),
    filter(i => i > 7),
    map(val => val + ' mapped')
);

const startTimer = merge(of('START'), obsFilterAndMapped);

const obsTapDelay = startTimer.pipe(
    switchMap(val => timer(delayTime).pipe(
        tap(() => tapAfterTime(valueEmitted)),
        switchMap(() => empty()),
    )),
    takeUntil(stop),
)

merge(obsFilterAndMapped, obsTapDelay)
.subscribe(console.log, null, () => console.log('completed'))

При таком подходе вы выполняете tapAfterTime действие в любое время источник observable$ не излучает ничего дольше, чем delayTime, Другими словами, это будет работать не только для первой эмиссии observable$ но на всю жизнь.

Вы можете проверить такой код с помощью следующего ввода

const obs1 = interval(100).pipe(
    take(10),
);
const obs2 = timer(2000, 100).pipe(
    take(10),
    map(val => val + 200),
);
const observable$ = merge(obs1, obs2);

С некоторой дополнительной работой мы можем даже думать, чтобы скрыть valueEmitted глобальная переменная внутри замыкания, но это увеличит сложность кода и, вероятно, не стоит.

Взгляни на

А может как то так?

-

initiateTimer() {
    if (this.timerSub) {
        this.timerSub.unsubscribe();
    }

    this.timerSub = Rx.Observable.timer(2000)
        .take(1)
        .subscribe(this.showPopup.bind(this));
}

Это может быть что-то вроде этого:

let cancel;
observable$.pipe(
  tap((x)=>clear=setTimeout(()=>console.log(x), 2000)),
   filter(x => x > 5),
   tap(x => clearTimeout(clear)),
   map(x => x + 1)
);
Другие вопросы по тегам