Цепные обещания с RxJS

Я новичок в RxJS и FRP в целом. У меня была идея преобразовать существующую цепочку обещаний в моем приложении ExpressJS, чтобы ее можно было наблюдать на практике. Я знаю, что это, вероятно, не лучший пример, но, возможно, кто-то может помочь пролить свет.

Что я пытаюсь сделать:

  1. У меня есть два обещания - prom1 и prom2
  2. Я хочу, чтобы Prom1 запускался до Prom2
  3. Если prom1 отправляет отклонение (err), я хочу отменить prom2 до его запуска.
  4. Я хочу, чтобы сообщение об ошибке Prom1 возвращалось в метод onError на обозревателе.

var prom1 = new Promise(function(resolve, reject) {
    if (true) {
       reject('reason');
    }
    resolve(true);
});

var prom2 = new Promise(function(resolve, reject) {
    resolve(true);
});

// What do I do here? This is what I've tried so far...
var source1 = Rx.Observable.fromPromise(prom1);
var source2 = source1.flatMap(Rx.Observable.fromPromise(prom2));

var subscription = source2.subscribe(
    function (result) { console.log('Next: ' + result); },

    // I want my error 'reason' to be made available here
    function (err) { console.log('Error: ' + err); },

    function () { console.log('Completed'); });

3 ответа

Решение

Если я понял, что вы пытаетесь сделать - вам нужно создать две отложенные наблюдаемые функции из функций, которые возвращают обещания и объединяют их:

var shouldFail = false;

function action1() {
    return new Promise(function (resolve, reject) {    
        console.log('start action1');
        if (shouldFail) {
            reject('reason');
        }
        resolve(true);
    });
}

function action2() {
    return new Promise(function (resolve, reject) {    
        console.log('start action2');
        resolve(true);
    });
}

var source1 = Rx.Observable.defer(action1);
var source2 = Rx.Observable.defer(action2);

var combination = Rx.Observable.concat(source1, source2);

var logObserver = Rx.Observer.create(

function (result) {
    console.log('Next: ' + result);
},

function (err) {
    console.log('Error: ' + err);
},

function () {
    console.log('Completed');
});

тогда для нормального случая:

combination.subscribe(logObserver);
// start action1
// Next: true
// start action2
// Next: true
// Completed

И случай, когда первое обещание не выполняется:

shouldFail = true;
combination.subscribe(logObserver);
// start action1
// Error: reason

http://jsfiddle.net/cL37tgva/

flatMap превращает наблюдаемую из наблюдаемых в наблюдаемую. Он используется во многих примерах с Обещаниями, потому что часто у вас есть наблюдаемое, а в функции карты вы хотите создать обещание для каждого "элемента" наблюдаемых выбросов. Потому что каждый вызов fromPromise создает новый Observable, что делает его "наблюдаемым из наблюдаемых". flatMap уменьшает это до "плоской" наблюдаемой.

В вашем примере вы делаете что-то другое, вы превращаете одно обещание в наблюдаемое и хотите связать его с другим наблюдаемым (также созданным из одного обещания). Concat делает то, что вы ищете, он объединяет две наблюдаемые вместе.

В случае ошибки будет работать, как вы ожидаете.

Observable.forkJoin отлично работает здесь, получая множество других Observables.

Rx.Observable.forkJoin([this.http.get('http://jsonplaceholder.typicode.com/posts'), this.http.get('http://jsonplaceholder.typicode.com/albums')]).subscribe((data) => {
      console.log(data);
    });
Другие вопросы по тегам