Получить подписчиков Observable и заставить их подписаться на другую подписку Observable
Проще говоря
Учитывая существующую Наблюдаемую (которая еще не завершена), есть ли способ извлечь связанных подписчиков (функции, переданные для подписки), чтобы вместо этого подписаться на другую Наблюдаемую?
контекст
Служба в моем приложении помогает создавать соединения SeverEvent, возвращать ConnectableObservable прокси-соединению и разрешать многоадресную передачу с помощью оператора публикации. Сервис отслеживает существующие соединения через внутреннее хранилище:
store: {[key: string]: ConnectionTracker};
// …
interface ConnectionTracker {
url: string;
eventSource: EventSource;
observable: rx.ConnectableObservable<any>;
subscription: rx.Subscription;
observer: rx.Observer<any>;
data?: any; // Arbitrary data
}
После создания соединения, если связанный трекер уже существует (идентификация выполняется с использованием конечной точки соединения), сервис должен:
- ok
Закрыть соединение с существующим трекером ServerEvent - ok
Откройте новое соединение SerevrEvent (отсюда и новый ConnectableObservable) - Замените существующий трекер Observable свежим наблюдаемым, но вместо этого заставьте существующих подписчиков подписаться на новый Observable
Вот часть кода, которая создает ConnectionTracker s
/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
let fullUri = endpoint + (queryString ? `?${queryString}` : '')
, tracker = this.findTrackerByEndpoint(endpoint) || {
observable: null,
fullUri: fullUri,
eventSource: null,
observer: null,
subscription: null
}
;
// Tracker exists
if (tracker.observable !== null) {
// If fullUri hasn't changed, use the tracker as is
if (tracker.fullUri === fullUri) {
return tracker;
}
// At this point, we know "fullUri" has changed, the tracker's
// connection should be replaced with a fresh one
// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
// them subscribe to the new Observable instead (created down below)
// Terminate previous connection and clean related resouces
tracker.observer.complete();
tracker.eventSource.close();
}
tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
// Executed once
tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
tracker.eventSource.onerror = e => observer.error(e);
// Keep track of the observer
tracker.observer = observer;
})
// Transform Observable into a ConnectableObservable for multicast
.publish()
;
// Start emitting right away and also keep a reference to
// proxy subscription for later disposal
tracker.subscription = tracker.observable.connect();
return tracker;
}
Спасибо.
2 ответа
Если вы пытаетесь сделать что-то вроде перемещения подписчика в другую наблюдаемую область, то вы просто не делаете то, что задумано в RxJS. Любая такая манипуляция в основном является взломом.
Если вы время от времени создаете новую наблюдаемую информацию (например, отправляя запрос) и хотите, чтобы какой-то подписчик всегда был подписан на самую последнюю из них, то вот решение:
private observables: Subject<Observable<Data>> = new Subject();
getData(): Observable<Data> {
return this.observables.pipe(switchAll());
}
onMakingNewRequest(newObservable: Observable<Data>) {
this.observables.push(newObservable);
}
Таким образом, вы можете выставить одну наблюдаемую (через getData()
) на которую подписывается клиент, но нажав this.observables
Вы изменяете фактический источник данных, которые видит пользователь.
Что касается закрытия соединения и тому подобного, то ваша наблюдаемая (та, которая создается с каждым запросом или чем-то еще) должна в основном заботиться о выпуске и закрытии вещей, когда они отписаны, тогда вам не нужно делать никакой дополнительной обработки, предыдущую наблюдаемую будет автоматически отписаться с момента нажатия новой. Детали зависят от фактического бэкэнда, с которым вы связываетесь.
Вместо того, чтобы пытаться переносить подписчиков из одной наблюдаемой в другую вручную, вы должны предоставить слушателям наблюдаемую, которая автоматически переключится на другую наблюдаемую при необходимости.
Вы делаете это, работая с Observable высокого порядка (Observable, который испускает Observables), который всегда переключается на самую последнюю внутреннюю Observable.
Основная концепция
// a BehaviorSubject is used so that late subscribers also immediately get the most recent inner Observable
const higherOrderObservable = new BehaviorSubject<Observable<any>>(EMPTY);
// pass new Observable to listeners
higherOrderObservable.next(new Observable(..));
// get most recent inner Observable
const currentObservable = higherOrderObservable.pipe(switchMap(obs => obs));
currentObservable.subscribe(valueFromInnerObservable => { .. })
В твоем случае
Для каждой конечной точки создайте BehaviorSubject
(поставщик трекера), который испускает Observable (трекер), который в настоящее время должен использоваться для этой конечной точки. Когда для данной конечной точки следует использовать другой трекер, передайте эту новую наблюдаемую BehaviorSubject
, Пусть ваши слушатели подписываются на BehaviorSubject
(поставщик трекера), который автоматически снабжает их нужным трекером, т. е. переключается на наблюдаемый объект, который должен использоваться в настоящее время.
Упрощенная версия вашего кода может выглядеть так, как показано ниже. Особенности зависят от того, как вы используете функцию createTracker
по всему вашему приложению.
interface ConnectionTracker {
fullUri: string;
tracker$: ConnectableObservable<any>;
}
// Map an endpoint to a tracker supplier.
// This is your higher order Observable as it emits objects that wrap an Observable
store: { [key: string]: BehaviorSubject<ConnectionTracker> };
closeAllTrackers$ = new Subject();
// Creates a new tracker if necessary and returns a ConnectedObservable for that tracker.
// The ConnectedObservable will always resemble the current tracker.
createTracker<T>(endpoint: string, queryString: string = null): Observable<any> {
const fullUri = endpoint + (queryString ? `?${queryString}` : '');
// if no tracker supplier for the endpoint exists, create one
if (!store[endpoint]) {
store[endpoint] = new BehaviorSubject<ConnectionTracker>(null);
}
const currentTracker = store[endpoint].getValue();
// if no tracker exists or the current one is obsolete, create a new one
if (!currentTracker || currentTracker.fullUri !== fullUri) {
const tracker$ = new Observable<T>(subscriber => {
const source = new EventSource(fullUri, { withCredentials: true });
source.onmessage = e => subscriber.next(JSON.parse(e.data));
source.onerror = e => subscriber.error(e);
return () => source.close(); // on unsubscribe close the source
}).pipe(publish()) as ConnectableObservable<any>;
tracker$.connect();
// pass the new tracker to the tracker supplier
store[endpoint].next({ fullUri, tracker$ });
}
// return the tracker supplier for the given endpoint that always switches to the current tracker
return store[endpoint].pipe(
switchMap(tracker => tracker ? tracker.tracker$ : EMPTY), // switchMap will unsubscribe from the previous tracker and thus close the connection if a new tracker comes in
takeUntil(this.closeAllTrackers$) // complete the tracker supplier on emit
);
}
// close all trackers and remove the tracker suppliers
closeAllTrackers() {
this.closeAllTrackers$.next();
this.store = {};
}
Если вы хотите закрыть все подключения трекера одновременно и существующие подписчики должны получить complete
уведомление, звонок closeAllTrackers
, Если вы хотите только закрыть некоторые подключения трекера, но не хотите, чтобы существующие подписчики получили complete
уведомление, чтобы они продолжали слушать новые трекеры, поставляемые в будущем, звоните store[trackerEndpoint].next(null)
для каждого трекера.