Наблюдаемые RxJ прослушивают в течение 10 секунд, возвращают только 5 полученных значений, отбрасывают остальные и продолжают прослушивание?
У меня есть наблюдаемая, которая будет принимать несколько торговых значений в реальном времени (возможно, много в секунду) из концентратора SignalR. То, что я пытаюсь достичь, - это наблюдаемая величина, которая непрерывно (каждые 10 секунд) выводит выборку из 5 сделок, произошедших за последние 10 секунд.
Я написал наблюдаемый канал, чтобы попытаться добиться этого, добавив все полученные сделки в буфер на 10 секунд, а затем создав наблюдаемую для каждой сделки в буферном массиве, используя «concatMap» и «от». Затем создаем еще один буфер, который собирает 5 значений и выдает их.
this.bufferedTradeObservable$ = this.tradeReceived
.pipe(
tap(v => console.log('pipe-start: ', v)),
distinct((e: Trade) => e.tradeId),
bufferTime(10000),
concatMap((tradeArray) => {
return from(tradeArray);
}),
bufferCount(5),
tap(v => console.log('pipe-end: ', v))
);
Однако канал продолжает выдавать все значения, которые он получает в 10-секундном окне, но в группах по 5. Я попытался добавить
take(5)
в канале после карты concat, и он работает правильно для первой партии из 5 значений, но затем наблюдаемое «завершается» и перестает прослушивать новые значения. Я также попытался добавить фильтр с индексом после concatMap следующим образом:
filter((v, i) => (i < 6 )),
Это работает для первого пакета из 5 значений, но затем продолжает отфильтровывать каждое значение, поэтому второй буфер из 5 никогда не создается. Также этот вариант использования фильтра кажется устаревшим.
Я не уверен, упускаю ли я из виду что-то очевидное, но я просмотрел многие операторы rxjs и не могу найти способ добиться этого.
3 ответа
Похоже, все, что вам нужно, это bufferTime. Вы можете решить, что оставить, а что выбросить позже.
this.bufferedTradeObservable$ = this.tradeReceived.pipe(
// Buffer for 1 seconds
bufferTime(10000),
// Only emit the last 5 values from the buffer.
map(buffer => buffer.slice(-5))
);
имеет
maxBufferSize
аргумент, который сделает это за вас.
this.bufferedTradeObservable$ = this.tradeReceived
.pipe(
tap(v => console.log('pipe-start: ', v)),
distinct((e: Trade) => e.tradeId),
bufferTime(10000, 10000, 5),
concatMap((tradeArray) => {
return from(tradeArray);
}),
tap(v => console.log('pipe-end: ', v))
);
Вы также можете использовать вместо этого вывод каждого значения, как только оно будет создано, а не ждать всех 5.
this.bufferedTradeObservable$ = this.tradeReceived
.pipe(
tap(v => console.log('pipe-start: ', v)),
distinct((e: Trade) => e.tradeId),
windowTime(10000, 10000, 5),
mergeAll()
tap(v => console.log('pipe-end: ', v))
);
Они описаны в документации дляbufferTime
а такжеwindowTime
соответственно.
Как насчет чего-то подобного,
let n = 5;
let t = 10;
//Source, emits a value every second (just a placeholder for real source)
const source = interval(1000);
//Take n=5 values from the source, then end the stream
const takeNValues = source.pipe(take(n));
//Every t=10 seconds switch to a new observable that emits n=5 values and then closes
const takeNValuesEveryTSeconds = interval(t * 1000).pipe(
switchMap(() => takeNValues)
);
//Subscribe and log n=5 values every t=10 seconds
takeNValuesEveryTSeconds.subscribe(n =>
console.log('Value => ', n)
);