RxJs: форма с потерями zip-оператора

Подумайте об использовании оператора zip для объединения двух бесконечных наблюдаемых, одна из которых испускает элементы в два раза чаще, чем другая.
Текущая реализация без потерь, т. Е. Если я продолжу излучение этих Observables в течение часа, а затем переключусь между их скоростями излучения, первая Observable в конце концов догонит другую.
Это приведет к взрыву памяти в какой-то момент, так как буфер становится все больше и больше.
То же самое произойдет, если первая наблюдаемая будет излучать предметы в течение нескольких часов, а вторая - в конце.

Как мне добиться поведения с потерями для этого оператора? Я просто хочу излучать каждый раз, когда получаю выбросы из обоих потоков, и мне все равно, сколько выбросов из более быстрого потока я пропускаю.

Разъяснения:

  • Главная проблема, которую я пытаюсь решить, - это взрыв памяти из-за природы без потерьzipоператор.
  • Я хочу излучать каждый раз, когда получаю выбросы отобоих потоков, даже если оба потока излучают одно и то же значение каждый раз

Пример:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70

регулярноеzipвыдаст следующий вывод:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

Вывод, который я хотел бы получить:

[1, 10]
[3, 20]
[5, 30]

Объяснение:
с потерями zip оператор zipс размером буфера 1, Это означает, что он сохранит только первый элемент из потока, который выдал первый, и потеряет все остальные (элементы, которые поступают между первым элементом и первым излучением из второго потока). Итак, что происходит в примере: stream1излучает 1, с потерями почтовый индекс "запоминает" его и игнорирует все элементы на stream1 до тех пор stream2 излучает. Первая эмиссия stream2 является 10 так stream1теряет 2, После взаимной эмиссии (первая эмиссия с потерями zip) это начинается заново: "помни" 3"свободный" 4, испускают [3,20], Тогда начните сначала: "помните" 5 "свободный" 6а также 7, испускают [5,30], Тогда начните сначала: "помните" 40"свободный"50,60, 70и ждать следующего пункта наstream1,

Пример 2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a

регулярное zip Оператор взорвет память в этом случае.
Я не хочу этого.

Резюме:
По сути, я ожидаю потери zip оператор запоминает только первое значение, испускаемое stream 1 после предыдущей взаимной эмиссии и эмиссии, когда stream 2 догоняет stream 1, И повтори.

5 ответов

Решение

Следующее даст вам желаемое поведение:

Observable.zip(s1.take(1), s2.take(1)).repeat()

В RxJs 5.5 Синтаксис канала:

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

Объяснение:

  • repeat оператор (в его текущей реализации) повторно подписывается на источник, наблюдаемый после его завершения, т.е. в данном конкретном случае он повторно подписывается на zip на каждую взаимную эмиссию.
  • zip объединяет две наблюдаемые и ждет, когда они оба испустят. combineLatest будет делать то же самое, это не имеет большого значения из-за take(1)
  • take(1) на самом деле заботится о взрыве памяти и определяет поведение с потерями

Если вы хотите получить последнее, а не первое значение из каждого потока при взаимном излучении, используйте это:

Observable.combineLatest(s1, s2).take(1).repeat()

В RxJs 5.5 Синтаксис канала:

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

Я думаю, что следующее должно всегда принимать последнее значение из каждого наблюдаемого источника.

const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();

source1.connect();
source2.connect();

Observable.defer(() => Observable.forkJoin(
        source1.takeUntil(source2.skipUntil(source1)),
        source2.takeUntil(source1.skipUntil(source2))
    ))
    .take(1)
    .repeat()
    .subscribe(console.log);

Демонстрация в реальном времени: http://jsbin.com/vawewew/11/edit?js,console

Это печатает:

[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]

Вам может понадобиться включить source1 а также source2 в горячие Observables, если они еще не.

Редактировать:

Основная часть source1.takeUntil(source2.skipUntil(source1)), Это принимает значения от source1 до тех пор source2 излучает. Но в то же время он будет игнорировать source1 до тех пор source2 испускает хотя бы одно значение:).

forkJoin() Наблюдаемые работы ждут, пока оба источника не завершат, помня последнюю эмиссию от каждого из них.

Затем мы хотим повторить процесс, и поэтому мы используем take(1) завершить цепочку и .repeat() немедленно подписаться

Это дает последовательность [ 0, 2 ] [ 1, 5 ] [ 2, 8 ] [ 3, 12 ] ...

const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)

const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => { 
    return x[0] === acc[0] || x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

fresh.subscribe(console.log);

с возможно меньшим количеством операторов. Не уверен, насколько это эффективно.
CodePen

Для обновления № 3,

Тогда вам понадобится ключ для каждого исходного элемента.

// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])

// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])

const combined = Rx.Observable
  .combineLatest(keyed1, keyed2)
  .map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')

const fresh = combined.scan((acc, x) => { 
    return x[1] === acc[1] || x[3] === acc[3] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

const dekeyed = fresh
  .map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output

Это производит

["x", "a"]  
["y", "a"]  
["z", "b"]  

CodePen (обновить страницу CodePen после открытия консоли, для лучшего отображения)

Вы упомянули размер буфера 1, задаваясь вопросом, будет ли это делать архивирование двух объектов ReplaySubjects с размером буфера 1?

Я добавляю другой ответ для ясности, поскольку он идет после принятого ответа (но основывается на моем предыдущем ответе).

Простите, если я неправильно понял, но я ожидал, что решение будет обрабатывать переключение скоростей излучения:

затем я переключаюсь между их скоростями излучения,

Поставляемый тест не переключает скорость излучения до тех пор, пока не остановится первый поток,

Stream1: 1 2    3 4    5 6 7                 
Stream2:     10     20    30 40 50 60 70

поэтому я попробовал еще один тест

Stream1: 1 2      3 4     5 6
Stream2:    10 20    30 40   50 60

Тестовые данные для этого потока

s1.next(1); s1.next(2); s2.next(10); s2.next(20); s1.next(3); s1.next(4);
s2.next(30); s2.next(40); s1.next(5); s1.next(6);  s2.next(50); s2.next(60);

Насколько я понимаю, принятый ответ не проходит этот тест.
Выводит

[1, 10]
[3, 20]
[4, 30]
[5, 40]
[6, 50]

тогда как я ожидал увидеть

[1, 10]
[3, 30]
[5, 50]

если оператор должен быть симметричным (коммутативным?)

Улучшение моего предыдущего ответа

Это решение построено на основе базовых операторов, поэтому его легче понять. Я не могу говорить об его эффективности, возможно, проверю это на другой итерации.

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

const tagged1 = s1.map(x=>[x,1])
const tagged2 = s2.map(x=>[x,2])
const merged = tagged1.merge(tagged2)
const fresh = merged.scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only
const dekeyed = fresh.map(keyed => keyed[0])
const paired = dekeyed.pairwise()
let index = 0
const sequenced = paired.map(x=>[x,index++])
const alternates = sequenced.filter(x => x[1] % 2 === 0)
const deindexed = alternates.map(x=>x[0])

или в более компактной форме, если предпочтительнее

let index = 0
const output = 
  s1.map(x=>[x,1]).merge(s2.map(x=>[x,2])) // key by stream id
  .scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged()       //fresh ones only
  .map(keyed => keyed[0])       // de-key
  .pairwise()                   // pair
  .map(x=>[x,index++])          // add a sequence no
  .filter(x => x[1] % 2 === 0)  // take even sequence
  .map(x=>x[0])                 // deindex

Для тестирования CodePen (обновите страницу CodePen после открытия консоли, для лучшего отображения)

Другие вопросы по тегам