Оператор потока массивов в массив потоков

В Rx.js, как превратить поток массивов в массив потоков, например, у меня есть поток следующего:['0a','0b'], ['1a','1b'],['2a','2b','2c'] и я хочу получить следующие потоки:

0a---1a---2a--->
0b---1b---2b--->
          2c--->

Есть ли какие-либо операторы, которые делают что-то подобное, или я должен написать один с нуля?

2 ответа

Решение

Как то так должно работать

stream.
  flatMap(array =>
    Rx.Observable.from(
      array.map((obj, i) => {index: i, ...obj})
    )
  ).groupBy(x => x.index, ).
  subscribe(x =>
    x.map((x,i) => subscribe(x))
  )

Вы можете достичь этого относительно легко с существующими операторами.

То, что вы хотите достичь, очень похоже на то, что описано здесь: RXJS: попеременно объединять элементы потоков

Это предлагает два пути:

  1. с использованием Rx.Observable.zip оператор (принимает в качестве аргумента массив наблюдаемых и испускает поток массивов, элемент которого по индексу n является x-м значением, испускаемым n-й наблюдаемой)

    Однако это решение, примененное в вашем примере, остановится на 1a,1b потому что результирующая наблюдаемая будет завершена, как только одна из наблюдаемых завершится.

  2. расширяя ваши массивы, чтобы придать им одинаковую длину, заполнив их фиктивными значениями и применив Rx.Observable.zip оператор

В этих обоих вариантах:

  • если вы удалите последнюю строку, .concatMap.... вы получите поток массива, как [0a,0b], [1a,1b], [2a,2b,2c] из которого вы можете легко сопоставить по индексу (.map(function(array){return array[N];}) получу тебя [Na,Nb...]) чтобы получить поток, который вы хотите.
  • ИЛИ вы можете сохранить тот же код и добавить .filter(function(value,index){return index % N == I}), где N это количество потоков, и I поток, который вы хотите, то есть поток со значениями (Ia,Ib...)

Документация о zip operator> http://reactivex.io/documentation/operators/zip.html https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/zip.md,

Другие вопросы по тегам