Реактивные Расширения: Как создать наблюдаемый заполнитель?

У меня есть метод, getObs(), который возвращает наблюдаемое, которое должно быть общим для всех абонентов. Однако эта наблюдаемая может не существовать, когда кто-то getObs()и его создание является асинхронной операцией, поэтому моя идея заключалась в том, чтобы вернуть наблюдаемую метку-заполнитель, которая будет заменена реальной наблюдаемой после ее создания.

Моя основная попытка выглядит примерно так:

var createSubject = new Rx.Subject();
var placeholder = createSubject.switchLatest();

Куда я могу вернуться placeholder если реальная наблюдаемая не существует, когда вызывается getObs(). Когда создается реальная наблюдаемая, я использую createSubject.onNext(realObservable), который затем передает его switchLatest() это разворачивает это для любых подписчиков.

Тем не менее, кажется чрезмерным использование Subject и switchLatest для этой цели, поэтому мне интересно, есть ли более прямое решение?

2 ответа

Решение

Если сам процесс получения наблюдаемого является асинхронным, вы должны смоделировать это также как наблюдаемое.

Например...

var getObsAsync = function () {
    return Rx.Observable.create(function (observer) {
        var token = startSomeAsyncAction(function (result) {
                // the async action has completed!
                var obs = Rx.Observable.fromArray(result.dataArray);
                token = undefined;
                observer.OnNext(obs);
                observer.OnCompleted();
            }),
            unsubscribeAction = function () {
                if (asyncAction) {
                    stopSomeAsyncAction(token);
                }
            };            

        return unsubscribeAction;
    });
};

var getObs = function () { return getObsAsync().switchLatest(); };

И если вы хотите поделиться одним экземпляром этого наблюдаемого, но не хотите получать наблюдаемое, пока кто-то на самом деле не подпишется, тогда вы делаете:

// source must be a Connectable Observable (ie the result of Publish or Replay)
// will connect the observable the first time an observer subscribes
// If an action is supplied, then it will call the action with a disposable
// that can be used to disconnect the observable.
// idea taken from Rxx project
Rx.Observable.prototype.prime = function (action) {
    var source = this;
    if (!(source instanceof Rx.Observable) || !source.connect) {
        throw new Error("source must be a connectable observable");
    }

    var connection = undefined;
    return Rx.Observable.createWithDisposable(function (observer) {
        var subscription = source.subscribe(observer);

        if (!connection) {
            // this is the first observer.  Connect the underlying observable.
            connection = source.connect();
            if (action) {
                // Call action with a disposable that will disconnect and reset our state
                var disconnect = function() {
                    connection.dispose();
                    connection = undefined;
                };
                action(Rx.Disposable.create(disconnect));
            }
        }

        return subscription;
    });
};

var globalObs = Rx.Observable.defer(getObs).publish().prime();

Теперь код везде, где можно просто использовать globalObs и не беспокоиться об этом:

// location 1
globalObs.subscribe(...);

// location 2
globalObs.select(...)...subscribe(...);

Обратите внимание, что никому на самом деле даже не нужно звонить getObs потому что вы просто установите глобальную наблюдаемую, которая будет (через defer) вызов getObs для вас, когда кто-то подписывается.

Вы можете использовать тему, чтобы подключить источник после факта:

var placeholder = new Subject<YourType>();
// other code can now subscribe to placeholder, best expose it as IObservable

когда источник создан:

var asyncCreatedObs = new ...;
placeholder.Subscribe(asyncCreatedObs);
// subscribers of placeholder start to see asyncCreatedObs 
Другие вопросы по тегам