Rx.Subject проигрывает события

Кто-нибудь может объяснить, в чем разница между этими 3 вариантами?

http://jsfiddle.net/8vx2g3fr/2/

  1. Сначала работает как следует, все события обрабатываются.
  2. Но второй проигрывает последнее событие (3)
  3. Третье проигрывает второе событие (2)

Не могли бы вы помочь мне понять, в чем проблема и как заставить третий вариант обрабатывать все события?

1

let bs = new Rx.Subject();
bs
    .subscribe(v=>{
        console.log("in", v);
        if (v % 2 == 0) {
            setTimeout(()=>{
                console.log(" out", v, "->" , v + 1);
                bs.next(v+1);
            }, 0);
        }
    });

bs.next(0);
bs.next(2);

Выход:

in 0
in 2
 out 0 -> 1
in 1
 out 2 -> 3
in 3

2

let bs2 = new Rx.Subject();
bs2
    .subscribe(v=>{
        console.log("in", v);
        if (v % 2 == 0) {            
            Rx.Observable.interval(0).take(1)
                .map(()=>{console.log(" out", v, "->" , v + 1);return v+1;})
                .subscribe(bs2);
        }
    });

bs2.next(0);
bs2.next(2);

Выход:

in 0
in 2
 out 0 -> 1
in 1
 out 2 -> 3

3

let bs3 = new Rx.Subject();
bs3
    .switchMap(v=>{
        console.log("in", v);
        if (v % 2 == 0) {            
            return Rx.Observable.interval(0).take(1)
                .map(()=>{console.log(" out", v, "->" , v + 1);return v+1;});
        }

    return Rx.Observable.empty();     
    }).subscribe(bs3);

bs3.next(0);
bs3.next(2);

Выход:

in 0
in 2
 out 2 -> 3
in 3

1 ответ

Решение

Это все на самом деле ожидаемое поведение.

Смущает то, что происходит, когда вы используете повторно Subjectи оператор, такой как take() многократно.

оператор take(1) принимает только одно значение и отправить complete уведомление. Это уведомление получено Subject потому что .subscribe(bs2), Теперь самое важное.
Когда Subject получает complete или же error уведомление он помечает себя как остановленный. Это означает, что он никогда не будет отправлять какие-либо элементы или уведомления, что является правильным и ожидаемым поведением в Rx. Уведомления complete или же error должны быть последними выбросами.

Итак Subject завершен первым take(1) который запускается по значению 0 (bs2.next(0) вызов).

Тогда когда значение 2 запускает второй прогон Observable.interval(0).take(1) это получено Subject но это автоматически игнорируется, потому что Subject уже помечен как остановленный.

Процесс в третьем демо точно такой же.

Вы можете увидеть это в исходном коде в Subject.ts:

Другие вопросы по тегам