RxJS 5, преобразование наблюдаемого в BehaviorSubject(?)

У меня есть наблюдаемый родитель, который, когда у него есть подписчик, выполнит поиск и выдаст одно значение, а затем завершит.

Я хотел бы преобразовать это в наблюдаемую (или предмет поведения или что-либо еще работающее), которая выполняет следующее: как только у него есть хотя бы один подписчик, он получает результат от родительской наблюдаемой (один раз). Затем он выдает это значение всем своим подписчикам, а также выдает это единственное значение всем будущим подписчикам, когда они подписываются. Это должно продолжаться с таким поведением, даже если число его подписчиков падает до нуля.

Кажется, это должно быть легко. Вот что не сработало:

theValue$: Observable<boolean> = parent$
.take(1)
.share()

Другие вещи, которые не работали: publishReplay(), publish(), Что-то, что сработало лучше:

theValue$ = new BehaviorSubject<boolean>(false);

parent$
.take(1)
.subscribe( value => theValue$.next(value));

Есть проблема с этим подходом, хотя: parent$ подписан на ранее theValue$ получает своего первого подписчика.

Есть ли лучший способ справиться с этим?

1 ответ

Решение

shareReplay следует делать то, что вы хотите:

import 'rxjs/add/operator/shareReplay';
...
theValue$: Observable<boolean> = parent$.shareReplay(1);

shareReplay был добавлен в RxJS версии 5.4.0. Возвращает подсчитанную наблюдаемую ссылку, которая подпишется на источник - parent$ - при первой подписке. А подписки, которые сделаны после завершения источника, будут получать повторные уведомления.

shareReplay - а также refCount в целом - более подробно объясняется в статье, которую я недавно написал: RxJS: Как использовать refCount.

Я реализовал метод для преобразования Observables в BehaviorSubjects, так как я думаю, что метод shareReplay не очень читабелен для использования в будущем.

import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
    const subject = new BehaviorSubject(initValue);

    observable.subscribe(
        (x: T) => {
            subject.next(x);
        },
        (err: any) => {
            subject.error(err);
        },
        () => {
            subject.complete();
        },
    );

    return subject;
}

Это улучшенный вариант ответа tmuechsch.

import { Observable, BehaviorSubject } from 'rxjs';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
  const subject = new BehaviorSubject(initValue);
  const subscription = observable.subscribe(subject);
  return {
    subject,
    stopWatching: () => subscription.unsubscribe()
  };
}

Будьте осторожны, потому что возвращенная тема никогда не отменяет подписку на наблюдаемый источник. Вам нужно позвонитьstopWatching вручную, когда вы знаете, что больше нет ссылок на subject(например, когда компонент представления уничтожен / размонтирован). В противном случае вы получите утечку памяти.

Это невозможно сделать абсолютно безопасное решение для данной проблемы. Причина в том, что субъект поведения имеетvalue атрибут, который должен всегда обновляться, даже если тема не подписана, поэтому вы не можете отказаться от подписки observable автоматически, когда все отписываются от subject.

В cartant в растворе не является совершенным тоже, потому что результат не являетсяinstanceof BehaviorSubject а также shareReplay записывает значения только при подписке.

Другие вопросы по тегам