RxJs: форма с потерями zip-оператора
Подумайте об использовании оператора zip для объединения двух бесконечных наблюдаемых, одна из которых испускает элементы в два раза чаще, чем другая.
Текущая реализация без потерь, т. Е. Если я продолжу излучение этих Observables в течение часа, а затем переключусь между их скоростями излучения, первая Observable в конце концов догонит другую.
Это приведет к взрыву памяти в какой-то момент, так как буфер становится все больше и больше.
То же самое произойдет, если первая наблюдаемая будет излучать предметы в течение нескольких часов, а вторая - в конце.
Как мне добиться поведения с потерями для этого оператора? Я просто хочу излучать каждый раз, когда получаю выбросы из обоих потоков, и мне все равно, сколько выбросов из более быстрого потока я пропускаю.
Разъяснения:
- Главная проблема, которую я пытаюсь решить, - это взрыв памяти из-за природы без потерь
zip
оператор. - Я хочу излучать каждый раз, когда получаю выбросы отобоих потоков, даже если оба потока излучают одно и то же значение каждый раз
Пример:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
регулярноеzip
выдаст следующий вывод:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Вывод, который я хотел бы получить:
[1, 10]
[3, 20]
[5, 30]
Объяснение:
с потерями zip
оператор zip
с размером буфера 1
, Это означает, что он сохранит только первый элемент из потока, который выдал первый, и потеряет все остальные (элементы, которые поступают между первым элементом и первым излучением из второго потока). Итак, что происходит в примере: stream1
излучает 1
, с потерями почтовый индекс "запоминает" его и игнорирует все элементы на stream1
до тех пор stream2
излучает. Первая эмиссия stream2
является 10
так stream1
теряет 2
, После взаимной эмиссии (первая эмиссия с потерями zip
) это начинается заново: "помни" 3
"свободный" 4
, испускают [3,20]
, Тогда начните сначала: "помните" 5
"свободный" 6
а также 7
, испускают [5,30]
, Тогда начните сначала: "помните" 40
"свободный"50
,60
, 70
и ждать следующего пункта наstream1
,
Пример 2:
Stream1: 1 2 3 ... 100000000000
Stream2: a
регулярное zip
Оператор взорвет память в этом случае.
Я не хочу этого.
Резюме:
По сути, я ожидаю потери zip
оператор запоминает только первое значение, испускаемое stream 1
после предыдущей взаимной эмиссии и эмиссии, когда stream 2
догоняет stream 1
, И повтори.
5 ответов
Следующее даст вам желаемое поведение:
Observable.zip(s1.take(1), s2.take(1)).repeat()
В RxJs 5.5
Синтаксис канала:
zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Объяснение:
repeat
оператор (в его текущей реализации) повторно подписывается на источник, наблюдаемый после его завершения, т.е. в данном конкретном случае он повторно подписывается наzip
на каждую взаимную эмиссию.zip
объединяет две наблюдаемые и ждет, когда они оба испустят.combineLatest
будет делать то же самое, это не имеет большого значения из-заtake(1)
take(1)
на самом деле заботится о взрыве памяти и определяет поведение с потерями
Если вы хотите получить последнее, а не первое значение из каждого потока при взаимном излучении, используйте это:
Observable.combineLatest(s1, s2).take(1).repeat()
В RxJs 5.5
Синтаксис канала:
combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.combineLatest(s1,s2).take(1).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
Я думаю, что следующее должно всегда принимать последнее значение из каждого наблюдаемого источника.
const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();
source1.connect();
source2.connect();
Observable.defer(() => Observable.forkJoin(
source1.takeUntil(source2.skipUntil(source1)),
source2.takeUntil(source1.skipUntil(source2))
))
.take(1)
.repeat()
.subscribe(console.log);
Демонстрация в реальном времени: http://jsbin.com/vawewew/11/edit?js,console
Это печатает:
[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]
Вам может понадобиться включить source1
а также source2
в горячие Observables, если они еще не.
Редактировать:
Основная часть source1.takeUntil(source2.skipUntil(source1))
, Это принимает значения от source1
до тех пор source2
излучает. Но в то же время он будет игнорировать source1
до тех пор source2
испускает хотя бы одно значение:).
forkJoin()
Наблюдаемые работы ждут, пока оба источника не завершат, помня последнюю эмиссию от каждого из них.
Затем мы хотим повторить процесс, и поэтому мы используем take(1)
завершить цепочку и .repeat()
немедленно подписаться
Это дает последовательность [ 0, 2 ] [ 1, 5 ] [ 2, 8 ] [ 3, 12 ] ...
const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)
const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => {
return x[0] === acc[0] || x[1] === acc[1] ? acc : x
})
.distinctUntilChanged() //fresh ones only
fresh.subscribe(console.log);
с возможно меньшим количеством операторов. Не уверен, насколько это эффективно.
CodePen
Для обновления № 3,
Тогда вам понадобится ключ для каждого исходного элемента.
// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])
// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])
const combined = Rx.Observable
.combineLatest(keyed1, keyed2)
.map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')
const fresh = combined.scan((acc, x) => {
return x[1] === acc[1] || x[3] === acc[3] ? acc : x
})
.distinctUntilChanged() //fresh ones only
const dekeyed = fresh
.map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output
Это производит
["x", "a"]
["y", "a"]
["z", "b"]
CodePen (обновить страницу CodePen после открытия консоли, для лучшего отображения)
Вы упомянули размер буфера 1, задаваясь вопросом, будет ли это делать архивирование двух объектов ReplaySubjects с размером буфера 1?
Я добавляю другой ответ для ясности, поскольку он идет после принятого ответа (но основывается на моем предыдущем ответе).
Простите, если я неправильно понял, но я ожидал, что решение будет обрабатывать переключение скоростей излучения:
затем я переключаюсь между их скоростями излучения,
Поставляемый тест не переключает скорость излучения до тех пор, пока не остановится первый поток,
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
поэтому я попробовал еще один тест
Stream1: 1 2 3 4 5 6
Stream2: 10 20 30 40 50 60
Тестовые данные для этого потока
s1.next(1); s1.next(2); s2.next(10); s2.next(20); s1.next(3); s1.next(4);
s2.next(30); s2.next(40); s1.next(5); s1.next(6); s2.next(50); s2.next(60);
Насколько я понимаю, принятый ответ не проходит этот тест.
Выводит
[1, 10]
[3, 20]
[4, 30]
[5, 40]
[6, 50]
тогда как я ожидал увидеть
[1, 10]
[3, 30]
[5, 50]
если оператор должен быть симметричным (коммутативным?)
Улучшение моего предыдущего ответа
Это решение построено на основе базовых операторов, поэтому его легче понять. Я не могу говорить об его эффективности, возможно, проверю это на другой итерации.
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
const tagged1 = s1.map(x=>[x,1])
const tagged2 = s2.map(x=>[x,2])
const merged = tagged1.merge(tagged2)
const fresh = merged.scan((acc, x) => {
return x[1] === acc[1] ? acc : x
})
.distinctUntilChanged() //fresh ones only
const dekeyed = fresh.map(keyed => keyed[0])
const paired = dekeyed.pairwise()
let index = 0
const sequenced = paired.map(x=>[x,index++])
const alternates = sequenced.filter(x => x[1] % 2 === 0)
const deindexed = alternates.map(x=>x[0])
или в более компактной форме, если предпочтительнее
let index = 0
const output =
s1.map(x=>[x,1]).merge(s2.map(x=>[x,2])) // key by stream id
.scan((acc, x) => {
return x[1] === acc[1] ? acc : x
})
.distinctUntilChanged() //fresh ones only
.map(keyed => keyed[0]) // de-key
.pairwise() // pair
.map(x=>[x,index++]) // add a sequence no
.filter(x => x[1] % 2 === 0) // take even sequence
.map(x=>x[0]) // deindex
Для тестирования CodePen (обновите страницу CodePen после открытия консоли, для лучшего отображения)