RXJS: попеременно объединять элементы потоков
Я хотел бы поочередно объединить элементы нескольких потоков:
var print = console.log.bind(console);
var s1 = Rx.Observable.fromArray([1, 1, 5]);
var s2 = Rx.Observable.fromArray([2, 9]);
var s3 = Rx.Observable.fromArray([3, 4, 6, 7, 8]);
alternate(s1, s2, s3).subscribe(print); // 1, 2, 3, 1, 9, 4, 5, 6, 7, 8
Как выглядит определение функции alternate
?
2 ответа
Используйте zip и concatMap при работе с наблюдаемыми, которые были созданы из массивов (как в вашем примере), или zip и flatMap при работе с наблюдаемыми, которые изначально асинхронны.
Rx.Observable
.zip(s1, s2, s3, function(x,y,z) { return [x,y,z]; })
.concatMap(function (list) { return Rx.Observable.from(list); })
.subscribe(print); // 1, 2, 3, 1, 9, 4
Обратите внимание, что это больше не происходит после завершения одной из исходных наблюдаемых. Это потому что zip
строго "сбалансирован" и ждет, пока все источники не получат соответствующее событие. Что вам нужно, так это несколько свободная версия zip при работе с законченными источниками.
Если есть значение (например, undefined
), которое не испускается исходными наблюдаемыми, это решение работает:
var concat = Rx.Observable.concat;
var repeat = Rx.Observable.repeat;
var zipArray = Rx.Observable.zipArray;
var fromArray = Rx.Observable.fromArray;
var print = console.log.bind(console);
var s1 = fromArray([1, 1, 5]);
var s2 = fromArray([2, 9]);
var s3 = fromArray([3, 4, 6, 7, 8]);
alternate(s1, s2, s3).subscribe(print);
function alternate() {
var sources = Array.slice(arguments).map(function(s) {
return concat(s, repeat(undefined))
});
return zipArray(sources)
.map(function(values) {
return values.filter(function(x) {
return x !== undefined;
});
}).takeWhile(function(values) {
return values.length > 0;
}).concatMap(function (list) { return fromArray(list); })
}
Тот же пример в ES6:
const {concat, repeat, zipArray, fromArray} = Rx.Observable;
var print = console.log.bind(console);
var s1 = fromArray([1, 1, 5]);
var s2 = fromArray([2, 9]);
var s3 = fromArray([3, 4, 6, 7, 8]);
alternate(s1, s2, s3).subscribe(print);
function alternate(...sources) {
return zipArray(sources.map( (s) => concat(s, repeat(undefined)) ))
.map((values) => values.filter( (x) => x !== undefined ))
.takeWhile( (values) => values.length > 0)
.concatMap( (list) => fromArray(list) )
}