Получить подписчиков Observable и заставить их подписаться на другую подписку Observable

Проще говоря

Учитывая существующую Наблюдаемую (которая еще не завершена), есть ли способ извлечь связанных подписчиков (функции, переданные для подписки), чтобы вместо этого подписаться на другую Наблюдаемую?

контекст

Служба в моем приложении помогает создавать соединения SeverEvent, возвращать ConnectableObservable прокси-соединению и разрешать многоадресную передачу с помощью оператора публикации. Сервис отслеживает существующие соединения через внутреннее хранилище:

store: {[key: string]: ConnectionTracker};

// …

interface ConnectionTracker {
    url: string;
    eventSource: EventSource;
    observable: rx.ConnectableObservable<any>;
    subscription: rx.Subscription;
    observer: rx.Observer<any>;
    data?: any; // Arbitrary data
}

После создания соединения, если связанный трекер уже существует (идентификация выполняется с использованием конечной точки соединения), сервис должен:

  • ok Закрыть соединение с существующим трекером ServerEvent
  • ok Откройте новое соединение SerevrEvent (отсюда и новый ConnectableObservable)
  • Замените существующий трекер Observable свежим наблюдаемым, но вместо этого заставьте существующих подписчиков подписаться на новый Observable

Вот часть кода, которая создает ConnectionTracker s

/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
    let fullUri = endpoint + (queryString ? `?${queryString}` : '')
        , tracker = this.findTrackerByEndpoint(endpoint) || {
            observable: null,
            fullUri: fullUri,
            eventSource: null,
            observer: null,
            subscription: null
        }
    ;

    // Tracker exists
    if (tracker.observable !== null) {
        // If fullUri hasn't changed, use the tracker as is
        if (tracker.fullUri === fullUri) {
            return tracker;
        }

        // At this point, we know "fullUri" has changed, the tracker's
        // connection should be replaced with a fresh one

// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
//   them subscribe to the new Observable instead (created down below)

        // Terminate previous connection and clean related resouces
        tracker.observer.complete();
        tracker.eventSource.close();
    }

    tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
    tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
            // Executed once
            tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
            tracker.eventSource.onerror = e => observer.error(e);
            // Keep track of the observer
            tracker.observer = observer;
        })
        // Transform Observable into a ConnectableObservable for multicast
        .publish()
    ;

    // Start emitting right away and also keep a reference to 
    // proxy subscription for later disposal
    tracker.subscription = tracker.observable.connect();

    return tracker;
}

Спасибо.

2 ответа

Решение

Если вы пытаетесь сделать что-то вроде перемещения подписчика в другую наблюдаемую область, то вы просто не делаете то, что задумано в RxJS. Любая такая манипуляция в основном является взломом.

Если вы время от времени создаете новую наблюдаемую информацию (например, отправляя запрос) и хотите, чтобы какой-то подписчик всегда был подписан на самую последнюю из них, то вот решение:

  private observables: Subject<Observable<Data>> = new Subject();

  getData(): Observable<Data> {
    return this.observables.pipe(switchAll());
  }

  onMakingNewRequest(newObservable: Observable<Data>) {
    this.observables.push(newObservable);
  }

Таким образом, вы можете выставить одну наблюдаемую (через getData()) на которую подписывается клиент, но нажав this.observables Вы изменяете фактический источник данных, которые видит пользователь.

Что касается закрытия соединения и тому подобного, то ваша наблюдаемая (та, которая создается с каждым запросом или чем-то еще) должна в основном заботиться о выпуске и закрытии вещей, когда они отписаны, тогда вам не нужно делать никакой дополнительной обработки, предыдущую наблюдаемую будет автоматически отписаться с момента нажатия новой. Детали зависят от фактического бэкэнда, с которым вы связываетесь.

Вместо того, чтобы пытаться переносить подписчиков из одной наблюдаемой в другую вручную, вы должны предоставить слушателям наблюдаемую, которая автоматически переключится на другую наблюдаемую при необходимости.

Вы делаете это, работая с Observable высокого порядка (Observable, который испускает Observables), который всегда переключается на самую последнюю внутреннюю Observable.

Основная концепция

// a BehaviorSubject is used so that late subscribers also immediately get the most recent inner Observable
const higherOrderObservable = new BehaviorSubject<Observable<any>>(EMPTY);

// pass new Observable to listeners
higherOrderObservable.next(new Observable(..));

// get most recent inner Observable
const currentObservable = higherOrderObservable.pipe(switchMap(obs => obs));
currentObservable.subscribe(valueFromInnerObservable => { .. })

В твоем случае

Для каждой конечной точки создайте BehaviorSubject (поставщик трекера), который испускает Observable (трекер), который в настоящее время должен использоваться для этой конечной точки. Когда для данной конечной точки следует использовать другой трекер, передайте эту новую наблюдаемую BehaviorSubject, Пусть ваши слушатели подписываются на BehaviorSubject (поставщик трекера), который автоматически снабжает их нужным трекером, т. е. переключается на наблюдаемый объект, который должен использоваться в настоящее время.

Упрощенная версия вашего кода может выглядеть так, как показано ниже. Особенности зависят от того, как вы используете функцию createTracker по всему вашему приложению.

interface ConnectionTracker {
  fullUri: string;
  tracker$: ConnectableObservable<any>;
}

// Map an endpoint to a tracker supplier.
// This is your higher order Observable as it emits objects that wrap an Observable
store: { [key: string]: BehaviorSubject<ConnectionTracker> };
closeAllTrackers$ = new Subject();

// Creates a new tracker if necessary and returns a ConnectedObservable for that tracker. 
// The ConnectedObservable will always resemble the current tracker.
createTracker<T>(endpoint: string, queryString: string = null): Observable<any> {
  const fullUri = endpoint + (queryString ? `?${queryString}` : '');
  // if no tracker supplier for the endpoint exists, create one
  if (!store[endpoint]) {
    store[endpoint] = new BehaviorSubject<ConnectionTracker>(null);
  }
  const currentTracker = store[endpoint].getValue();

  // if no tracker exists or the current one is obsolete, create a new one
  if (!currentTracker || currentTracker.fullUri !== fullUri) {
    const tracker$ = new Observable<T>(subscriber => {
      const source = new EventSource(fullUri, { withCredentials: true });
      source.onmessage = e => subscriber.next(JSON.parse(e.data));
      source.onerror = e => subscriber.error(e);
      return () => source.close(); // on unsubscribe close the source
    }).pipe(publish()) as ConnectableObservable<any>;
    tracker$.connect();
    // pass the new tracker to the tracker supplier
    store[endpoint].next({ fullUri, tracker$ });
  }
  // return the tracker supplier for the given endpoint that always switches to the current tracker
  return store[endpoint].pipe(
    switchMap(tracker => tracker ? tracker.tracker$ : EMPTY), // switchMap will unsubscribe from the previous tracker and thus close the connection if a new tracker comes in
    takeUntil(this.closeAllTrackers$) // complete the tracker supplier on emit
  );
}

// close all trackers and remove the tracker suppliers
closeAllTrackers() {
  this.closeAllTrackers$.next();
  this.store = {};
}

Если вы хотите закрыть все подключения трекера одновременно и существующие подписчики должны получить complete уведомление, звонок closeAllTrackers, Если вы хотите только закрыть некоторые подключения трекера, но не хотите, чтобы существующие подписчики получили complete уведомление, чтобы они продолжали слушать новые трекеры, поставляемые в будущем, звоните store[trackerEndpoint].next(null) для каждого трекера.

Другие вопросы по тегам