Оператор потока массивов в массив потоков
В Rx.js, как превратить поток массивов в массив потоков, например, у меня есть поток следующего:['0a','0b'], ['1a','1b'],['2a','2b','2c'] и я хочу получить следующие потоки:
0a---1a---2a--->
0b---1b---2b--->
2c--->
Есть ли какие-либо операторы, которые делают что-то подобное, или я должен написать один с нуля?
2 ответа
Как то так должно работать
stream.
flatMap(array =>
Rx.Observable.from(
array.map((obj, i) => {index: i, ...obj})
)
).groupBy(x => x.index, ).
subscribe(x =>
x.map((x,i) => subscribe(x))
)
Вы можете достичь этого относительно легко с существующими операторами.
То, что вы хотите достичь, очень похоже на то, что описано здесь: RXJS: попеременно объединять элементы потоков
Это предлагает два пути:
с использованием
Rx.Observable.zip
оператор (принимает в качестве аргумента массив наблюдаемых и испускает поток массивов, элемент которого по индексу n является x-м значением, испускаемым n-й наблюдаемой)Однако это решение, примененное в вашем примере, остановится на
1a,1b
потому что результирующая наблюдаемая будет завершена, как только одна из наблюдаемых завершится.расширяя ваши массивы, чтобы придать им одинаковую длину, заполнив их фиктивными значениями и применив
Rx.Observable.zip
оператор
В этих обоих вариантах:
- если вы удалите последнюю строку,
.concatMap....
вы получите поток массива, как[0a,0b], [1a,1b], [2a,2b,2c]
из которого вы можете легко сопоставить по индексу (.map(function(array){return array[N];})
получу тебя[Na,Nb...]
) чтобы получить поток, который вы хотите. - ИЛИ вы можете сохранить тот же код и добавить
.filter(function(value,index){return index % N == I})
, гдеN
это количество потоков, иI
поток, который вы хотите, то есть поток со значениями(Ia,Ib...)
Документация о zip
operator> http://reactivex.io/documentation/operators/zip.html https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/zip.md,