Наблюдаемые операторы "что-то делают" через некоторое время
В цепочке наблюдаемых rxjs, как я могу сделать что-то с доступом к текущему значению наблюдаемой по истечении заданного количества времени? По сути, я ищу что-то вроде оператора tap, но он выполняется только в том случае, если прошло определенное количество времени, не видя значения из наблюдаемой. Так что на самом деле это как комбинация касания и тайм-аута.
Я представляю что-то вроде следующего
observable$.pipe(
first(x => x > 5),
tapAfterTime(2000, x => console.log(x)),
map(x => x + 1)
).subscribe(...);
Это вымышленный пример, а функция tapAfterTime нереальна. Но основная идея состоит в том, что если после подписки прошло 2000 мс, а наблюдаемое не увидело значение больше 5, то сделайте функцию обратного вызова tapAfterTime для любого текущего значения наблюдаемого. Если бы мы увидели значение больше 5 до 2000 мс, обратный вызов tapAfterTime никогда не запустился бы, но функция карты всегда работала бы так, как ожидалось.
Есть ли оператор для достижения этой или любой комбинации операторов?
3 ответа
Может быть, это действительно слишком сложно, может быть, это стоит посмотреть.
Идея состоит в том, чтобы иметь 2 разных наблюдаемых, созданных трансформируя источник observable$
а потом в итоге слились.
Первая наблюдаемая, давайте назовем это obsFilterAndMapped
это тот, где вы делаете свою фильтрацию и отображение.
Вторая наблюдаемая, давайте назовем это obsTapDelay
, является наблюдаемой, которая запускает новый таймер с определенной задержкой в любое время, когда появляется первая наблюдаемая, т.е. obsFilterAndMapped
, испускает - как только задержка времени пройдена, вы выполняете tapAfterTime
действие - если первый Observable испускает новое значение перед передачей delayTime, то создается новый таймер.
Это код, который реализует эту идею
const stop = new Subject<any>();
const obsShared = observable$.pipe(
finalize(() => {
console.log('STOP');
stop.next();
stop.complete()
}),
share()
);
const delayTime = 300;
const tapAfterTime = (value) => {
console.log('tap with delay', value)
};
let valueEmitted;
const obsFilterAndMapped = obsShared.pipe(
tap(val => valueEmitted = val),
filter(i => i > 7),
map(val => val + ' mapped')
);
const startTimer = merge(of('START'), obsFilterAndMapped);
const obsTapDelay = startTimer.pipe(
switchMap(val => timer(delayTime).pipe(
tap(() => tapAfterTime(valueEmitted)),
switchMap(() => empty()),
)),
takeUntil(stop),
)
merge(obsFilterAndMapped, obsTapDelay)
.subscribe(console.log, null, () => console.log('completed'))
При таком подходе вы выполняете tapAfterTime
действие в любое время источник observable$
не излучает ничего дольше, чем delayTime
, Другими словами, это будет работать не только для первой эмиссии observable$
но на всю жизнь.
Вы можете проверить такой код с помощью следующего ввода
const obs1 = interval(100).pipe(
take(10),
);
const obs2 = timer(2000, 100).pipe(
take(10),
map(val => val + 200),
);
const observable$ = merge(obs1, obs2);
С некоторой дополнительной работой мы можем даже думать, чтобы скрыть valueEmitted
глобальная переменная внутри замыкания, но это увеличит сложность кода и, вероятно, не стоит.
Взгляни на
- https://www.learnrxjs.io/operators/creation/timer.html
- https://www.learnrxjs.io/operators/creation/interval.html
А может как то так?
-
initiateTimer() {
if (this.timerSub) {
this.timerSub.unsubscribe();
}
this.timerSub = Rx.Observable.timer(2000)
.take(1)
.subscribe(this.showPopup.bind(this));
}
Это может быть что-то вроде этого:
let cancel;
observable$.pipe(
tap((x)=>clear=setTimeout(()=>console.log(x), 2000)),
filter(x => x > 5),
tap(x => clearTimeout(clear)),
map(x => x + 1)
);