Наблюдаемые RxJ прослушивают в течение 10 секунд, возвращают только 5 полученных значений, отбрасывают остальные и продолжают прослушивание?

У меня есть наблюдаемая, которая будет принимать несколько торговых значений в реальном времени (возможно, много в секунду) из концентратора SignalR. То, что я пытаюсь достичь, - это наблюдаемая величина, которая непрерывно (каждые 10 секунд) выводит выборку из 5 сделок, произошедших за последние 10 секунд.

Я написал наблюдаемый канал, чтобы попытаться добиться этого, добавив все полученные сделки в буфер на 10 секунд, а затем создав наблюдаемую для каждой сделки в буферном массиве, используя «concatMap» и «от». Затем создаем еще один буфер, который собирает 5 значений и выдает их.

      this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        bufferCount(5),
        tap(v => console.log('pipe-end: ', v))
      );

Однако канал продолжает выдавать все значения, которые он получает в 10-секундном окне, но в группах по 5. Я попытался добавить take(5)в канале после карты concat, и он работает правильно для первой партии из 5 значений, но затем наблюдаемое «завершается» и перестает прослушивать новые значения. Я также попытался добавить фильтр с индексом после concatMap следующим образом:

      filter((v, i) => (i < 6 )),

Это работает для первого пакета из 5 значений, но затем продолжает отфильтровывать каждое значение, поэтому второй буфер из 5 никогда не создается. Также этот вариант использования фильтра кажется устаревшим.

Я не уверен, упускаю ли я из виду что-то очевидное, но я просмотрел многие операторы rxjs и не могу найти способ добиться этого.

3 ответа

Похоже, все, что вам нужно, это bufferTime. Вы можете решить, что оставить, а что выбросить позже.

      this.bufferedTradeObservable$ = this.tradeReceived.pipe(
  // Buffer for 1 seconds
  bufferTime(10000),
  // Only emit the last 5 values from the buffer.
  map(buffer => buffer.slice(-5))
);

имеет maxBufferSizeаргумент, который сделает это за вас.

      this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000, 10000, 5),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        tap(v => console.log('pipe-end: ', v))
      );

Вы также можете использовать вместо этого вывод каждого значения, как только оно будет создано, а не ждать всех 5.

      this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        windowTime(10000, 10000, 5),
        mergeAll()
        tap(v => console.log('pipe-end: ', v))
      );

Они описаны в документации дляbufferTimeа такжеwindowTimeсоответственно.

Как насчет чего-то подобного,

      let n = 5;
let t = 10;

//Source, emits a value every second (just a placeholder for real source)

const source = interval(1000);

//Take n=5 values from the source, then end the stream
const takeNValues = source.pipe(take(n));

//Every t=10 seconds switch to a new observable that emits n=5 values and then closes

const takeNValuesEveryTSeconds = interval(t * 1000).pipe(
  switchMap(() => takeNValues)
);

//Subscribe and log n=5 values every t=10 seconds

takeNValuesEveryTSeconds.subscribe(n => 
  console.log('Value => ', n)
);
Другие вопросы по тегам