Как получить текущее значение RxJS Subject или Observable?

У меня есть сервис Angular 2:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

Все отлично работает. Но у меня есть другой компонент, которому не нужно подписываться, ему просто нужно получить текущее значение isLoggedIn в определенный момент времени. Как я могу это сделать?

13 ответов

Решение

Subject или же Observable не имеет текущего значения. Когда значение передается, оно передается подписчикам и Observable сделано с этим

Если вы хотите иметь текущее значение, используйте BehaviorSubject который предназначен именно для этой цели. BehaviorSubject сохраняет последнее переданное значение и немедленно отправляет его новым подписчикам.

У этого также есть метод getValue() чтобы получить текущее значение.

Единственный способ, которым вы должны получать значения "из" Observable/Subject - это подписаться!

Если вы используете getValue() вы делаете что-то обязательное в декларативной парадигме. Он есть в качестве аварийного люка, но 99,9% времени НЕ следует использовать getValue() , Есть несколько интересных вещей, которые getValue() будет делать: он выдаст ошибку, если субъект был отписан, он не даст вам получить значение, если субъект мертв, потому что он ошибочен и т. д. Но, опять же, он используется в качестве аварийного люка для редких обстоятельств.

Существует несколько способов получения последнего значения от субъекта или наблюдаемого способом "Rx-y":

  1. С помощью BehaviorSubject: Но на самом деле подписаться на это. Когда вы впервые подписываетесь на BehaviorSubject он будет синхронно отправлять предыдущее значение, которое он получил или был инициализирован.
  2. Используя ReplaySubject(N): Это будет кеш N значения и воспроизвести их для новых подписчиков.
  3. A.withLatestFrom(B): Используйте этот оператор, чтобы получить самое последнее значение из наблюдаемого B когда наблюдаемый A излучает. Даст вам оба значения в массиве [a, b],
  4. A.combineLatest(B): Используйте этот оператор, чтобы получить самые последние значения из A а также B каждый раз либо A или же B излучает. Даст вам оба значения в массиве.
  5. shareReplay(): Делает наблюдаемую многоадресную рассылку через ReplaySubject, но позволяет повторить наблюдаемое при ошибке. (В основном это дает вам обещанное поведение кеширования).
  6. publishReplay(), publishBehavior(initialValue), multicast(subject: BehaviorSubject | ReplaySubject) и т. д. Другие операторы, которые используют BehaviorSubject а также ReplaySubject, Различные разновидности одной и той же вещи, они в основном направляют источник, наблюдаемый путем направления всех уведомлений через тему. Вам нужно позвонить connect() подписаться на источник с темой.

У меня была похожая ситуация, когда поздние подписчики подписывались на Предмет после того, как его ценность прибыла.

Я обнаружил, что ReplaySubject, который похож на BehaviorSubject, работает как шарм в этом случае. А вот ссылка на лучшее объяснение: http://reactivex.io/rxjs/manual/overview.html

const observable = of('response')

function hasValue(value: any) {
  return value !== null && value !== undefined;
}

function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}

const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

Вы можете проверить полную статью о том, как реализовать это здесь. https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/

Аналогичный ответ был отклонен. Но я думаю, что могу оправдать то, что я предлагаю здесь для ограниченных случаев.


Хотя верно, что наблюдаемая не имеет текущего значения, очень часто она будет иметь сразу доступное значение. Например, в хранилищах redux / flux / akita вы можете запрашивать данные из центрального хранилища, основываясь на количестве наблюдаемых, и это значение обычно будет сразу же доступно.

Если это так, то когда вы subscribe, значение вернется немедленно.

Допустим, у вас был вызов в службу, и по окончании вы хотите получить из вашего магазина самую свежую ценность, которая потенциально может не генерироваться:

Вы можете попытаться сделать это (и вы должны как можно больше держать вещи "в трубах"):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {

                        // we have serviceCallResponse and customer 
                     });

Проблема в том, что он будет блокироваться до тех пор, пока вторичная наблюдаемая не выдаст значение, которое потенциально может никогда не быть.

Недавно я обнаружил, что мне нужно оценивать наблюдаемое только в том случае, если значение было немедленно доступно, и, что более важно, мне нужно было уметь определять, не было ли оно. Я закончил тем, что сделал это:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {

                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;

                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);

                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();

                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

Обратите внимание, что для всего вышеперечисленного я использую subscribe чтобы получить значение (как обсуждает @Ben). Не используя .value собственность, даже если бы у меня был BehaviorSubject,

Я сталкивался с той же проблемой в дочерних компонентах, где первоначально он должен был бы иметь текущее значение Subject, а затем подписаться на Subject для прослушивания изменений. Я просто поддерживаю текущее значение в Сервисе, чтобы оно было доступно для компонентов, например:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {

  isLoggedIn: boolean;

  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

Компонент, которому нужно текущее значение, может просто получить к нему доступ из службы, то есть:

sessionStorage.isLoggedIn

Не уверен, что это правильная практика:)

Подписку можно создать и после взятия первого отправленного элемента уничтожить. Pipe - это функция, которая использует Observable в качестве входных данных и возвращает другой Observable в качестве выходных данных, не изменяя при этом первую наблюдаемую. Угловой 8.1.0. Пакеты:"rxjs": "6.5.3", "rxjs-observable": "0.0.7"

  ngOnInit() {

    ...

    // If loading with previously saved value
    if (this.controlValue) {

      // Take says once you have 1, then close the subscription
      this.selectList.pipe(take(1)).subscribe(x => {
        let opt = x.find(y => y.value === this.controlValue);
        this.updateValue(opt);
      });

    }
  }

Хотя это может показаться излишним, это всего лишь еще одно "возможное" решение, позволяющее сохранить тип Observable и уменьшить количество шаблонов...

Вы всегда можете создать расширение для получения текущего значения Observable.

Для этого вам необходимо продлить Observable<T> интерфейс в global.d.ts файл объявления печатных машин. Затем реализовать расширение геттер в observable.extension.ts файл и, наконец, включить в ваше приложение как набор текста, так и файл расширения.

Вы можете обратиться к этому ответу Stackru, чтобы узнать, как включить расширения в ваше приложение Angular.

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}

// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});

// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });

Этого можно добиться двумя способами.

  1. BehaviorSubject имеет метод getValue() которое вы можете получить в определенный момент времени.

  2. Вы можете подписаться напрямую с помощью BehaviorSubject и передать подписанное значение члену класса, полю или свойству.

Я бы не рекомендовал оба подхода.

В первом подходе это удобный метод, вы можете получить значение в любое время, вы можете называть его текущим снимком на тот момент времени. Проблема в том, что вы можете ввести в свой код условия гонки, вы можете вызывать этот метод во многих разных местах и ​​в разное время, что трудно отлаживать.

Второй подход - это то, что используют большинство разработчиков, когда они хотят получить необработанное значение при подписке, вы можете отслеживать подписку, и когда вы точно отказываетесь от подписки, чтобы избежать дальнейшей утечки памяти, вы можете использовать это, если действительно отчаянно хотите привязать его к переменной. и нет других способов связать это.

Я бы порекомендовал, еще раз взглянув на ваши варианты использования, где вы его используете? Например, вы хотите определить, вошел ли пользователь в систему или нет, когда вы вызываете какой-либо API, вы можете объединить его с другими наблюдаемыми:

      const data$ = apiRequestCall$().pipe(
 // Latest snapshot from BehaviorSubject.
 withLatestFrom(isLoggedIn),
 // Allow only call if loggedin.
 filter(([request, loggedIn) => loggedIn)
 // Do something else..
)

При этом вы можете использовать его непосредственно в пользовательском интерфейсе, подключив data$ | async в случае углового.

Другой подход. Если вы хотите/можете использовать асинхронное ожидание (должно быть внутри асинхронных функций), вы можете сделать это с помощью современных Rxjs:

       async myFunction () {
     const currentValue = await firstValueFrom(
      of(0).pipe(
        withLatestFrom(this.yourObservable$),
        map((tuple) => tuple[1]),
        take(1)
      )
    );
    // do stuff with current value

 }

 

Это выдаст значение «Сразу же» из-заwithLatestFrom, а затем разрешит обещание.

Наблюдаемые — это просто функции. Они возвращают поток значений, но чтобы увидеть эти значения, вам необходимо на них подписаться. Самый простой способ — позвонитьsubscribe()метод для наблюдаемого и передать наблюдателя илиnext()обратный вызов в качестве аргумента.

В качестве альтернативы вы можете использовать подписчика, который является реализацией Observer, но предоставляет вам дополнительные возможности, такие какunsubscribe.

      import { Subscription } from 'rxjs';

export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  privat _subs = new Subscription();
  isLoggedIn$ = this._isLoggedInSource.asObservable();
  isLoggedIn = false;
  constructor() {
    super('session');
    this._subs.add(this.isLoggedIn$.subscribe((value) => this.isLoggedIn = v))
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

Затем в другом компоненте вы можете импортироватьSessionStorageи получить доступ к значениюisLoggedIn.

ПРИМЕЧАНИЕ . Я бы использовалnew Observable()конструктор для инкапсуляции данных_isLoggedInSourceвнутри наблюдаемого.

Вы можете хранить последнее переданное значение отдельно от Observable. Тогда прочитайте это когда необходимо.

let lastValue: number;

const subscription = new Service().start();
subscription
    .subscribe((data) => {
        lastValue = data;
    }
);

Лучший способ сделать это - использовать Behaviur Subject, вот пример:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5
Другие вопросы по тегам