RxJS 5, преобразование наблюдаемого в BehaviorSubject(?)
У меня есть наблюдаемый родитель, который, когда у него есть подписчик, выполнит поиск и выдаст одно значение, а затем завершит.
Я хотел бы преобразовать это в наблюдаемую (или предмет поведения или что-либо еще работающее), которая выполняет следующее: как только у него есть хотя бы один подписчик, он получает результат от родительской наблюдаемой (один раз). Затем он выдает это значение всем своим подписчикам, а также выдает это единственное значение всем будущим подписчикам, когда они подписываются. Это должно продолжаться с таким поведением, даже если число его подписчиков падает до нуля.
Кажется, это должно быть легко. Вот что не сработало:
theValue$: Observable<boolean> = parent$
.take(1)
.share()
Другие вещи, которые не работали: publishReplay()
, publish()
, Что-то, что сработало лучше:
theValue$ = new BehaviorSubject<boolean>(false);
parent$
.take(1)
.subscribe( value => theValue$.next(value));
Есть проблема с этим подходом, хотя: parent$
подписан на ранее theValue$
получает своего первого подписчика.
Есть ли лучший способ справиться с этим?
1 ответ
shareReplay
следует делать то, что вы хотите:
import 'rxjs/add/operator/shareReplay';
...
theValue$: Observable<boolean> = parent$.shareReplay(1);
shareReplay
был добавлен в RxJS версии 5.4.0. Возвращает подсчитанную наблюдаемую ссылку, которая подпишется на источник - parent$
- при первой подписке. А подписки, которые сделаны после завершения источника, будут получать повторные уведомления.
shareReplay
- а также refCount
в целом - более подробно объясняется в статье, которую я недавно написал: RxJS: Как использовать refCount.
Я реализовал метод для преобразования Observables в BehaviorSubjects, так как я думаю, что метод shareReplay не очень читабелен для использования в будущем.
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
const subject = new BehaviorSubject(initValue);
observable.subscribe(
(x: T) => {
subject.next(x);
},
(err: any) => {
subject.error(err);
},
() => {
subject.complete();
},
);
return subject;
}
Это улучшенный вариант ответа tmuechsch.
import { Observable, BehaviorSubject } from 'rxjs';
export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
const subject = new BehaviorSubject(initValue);
const subscription = observable.subscribe(subject);
return {
subject,
stopWatching: () => subscription.unsubscribe()
};
}
Будьте осторожны, потому что возвращенная тема никогда не отменяет подписку на наблюдаемый источник. Вам нужно позвонитьstopWatching
вручную, когда вы знаете, что больше нет ссылок на subject
(например, когда компонент представления уничтожен / размонтирован). В противном случае вы получите утечку памяти.
Это невозможно сделать абсолютно безопасное решение для данной проблемы. Причина в том, что субъект поведения имеетvalue
атрибут, который должен всегда обновляться, даже если тема не подписана, поэтому вы не можете отказаться от подписки observable
автоматически, когда все отписываются от subject
.
В cartant в растворе не является совершенным тоже, потому что результат не являетсяinstanceof BehaviorSubject
а также shareReplay
записывает значения только при подписке.