Эквивалент почтового оператора RxJs в xstream?
Здравствуйте, я пытаюсь выяснить, существует ли эквивалентный zx-оператор RxJs в xstream или хотя бы способ получить такое же поведение. В случае, если кому-то понадобится разъяснить разницу, ниже будут показаны мраморные диаграммы.
zip in rxjs
|---1---2---3-----------5->
|-a------b------c---d----->
"zip"
|-1a----2b------3c-----5d->
whereas 'combineLatest' aka 'combine' in xstream does
|---1---2----------4---5->
|----a---b---c---d------->
"combine"
|-1a----2a-2b-2c-2d-4d-5d>
Любая помощь приветствуется, так как я очень плохо знаком с программированием потоков. Заранее спасибо!
1 ответ
Мне также был нужен zip-оператор для xstream. Поэтому я создал свой собственный из существующих операторов. Для архивирования требуется произвольное количество потоков.
function zip(...streams) {
// Wrap the events on each stream with a label
// so that we can seperate them into buckets later.
const streamsLabeled = streams
.map((stream$, idx) => stream$.map(event => ({label: idx + 1, event: event})));
return (event$) => {
// Wrap the events on each stream with a label
// so that we can seperate them into buckets later.
const eventLabeled$ = event$.map(event => ({label: 0, event: event}));
const labeledStreams = [eventLabeled$, ...streamsLabeled];
// Create the buckets used to store stream events
const buckets = labeledStreams.map((stream, idx) => idx)
.reduce((buckets, label) => ({...buckets, [label]: []}), {});
// Initial value for the fold operation
const accumulator = {buckets, tuple: []};
// Merge all the streams together and accumulate them
return xs.merge(...labeledStreams).fold((acc, event) => {
// Buffer the events into seperate buckets
acc.buckets[event.label].push(event);
// Does the first value of all the buckets have something in it?
// If so, then there is a complete tuple.
const tupleComplete = Object.keys(acc.buckets)
.map(key => acc.buckets[key][0])
.reduce((hadValue, value) => value !== undefined
? true && hadValue
: false && hadValue,
true);
// Save completed tuple and remove it from the buckets
if (tupleComplete) {
acc.tuple = [...Object.keys(acc.buckets).map(key => acc.buckets[key][0].event)];
Object.keys(acc.buckets).map(key => acc.buckets[key].shift());
} else {
// Clear tuple since all columns weren't filled
acc.tuple = [];
}
return {...acc};
}, accumulator)
// Only emit when we have a complete tuple
.filter(buffer => buffer.tuple.length !== 0)
// Just return the complete tuple
.map(buffer => buffer.tuple);
};
}
Это может быть использовано с compose.
foo$.compose(zip(bar$)).map(([foo, bar]) => doSomething(foo, bar))