Как получить текущее значение RxJS Subject или Observable?
У меня есть сервис Angular 2:
import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject} from 'rxjs/Subject';
@Injectable()
export class SessionStorage extends Storage {
private _isLoggedInSource = new Subject<boolean>();
isLoggedIn = this._isLoggedInSource.asObservable();
constructor() {
super('session');
}
setIsLoggedIn(value: boolean) {
this.setItem('_isLoggedIn', value, () => {
this._isLoggedInSource.next(value);
});
}
}
Все отлично работает. Но у меня есть другой компонент, которому не нужно подписываться, ему просто нужно получить текущее значение isLoggedIn в определенный момент времени. Как я могу это сделать?
13 ответов
Subject
или же Observable
не имеет текущего значения. Когда значение передается, оно передается подписчикам и Observable
сделано с этим
Если вы хотите иметь текущее значение, используйте BehaviorSubject
который предназначен именно для этой цели. BehaviorSubject
сохраняет последнее переданное значение и немедленно отправляет его новым подписчикам.
У этого также есть метод getValue()
чтобы получить текущее значение.
Единственный способ, которым вы должны получать значения "из" Observable/Subject - это подписаться!
Если вы используете getValue()
вы делаете что-то обязательное в декларативной парадигме. Он есть в качестве аварийного люка, но 99,9% времени НЕ следует использовать getValue()
, Есть несколько интересных вещей, которые getValue()
будет делать: он выдаст ошибку, если субъект был отписан, он не даст вам получить значение, если субъект мертв, потому что он ошибочен и т. д. Но, опять же, он используется в качестве аварийного люка для редких обстоятельств.
Существует несколько способов получения последнего значения от субъекта или наблюдаемого способом "Rx-y":
- С помощью
BehaviorSubject
: Но на самом деле подписаться на это. Когда вы впервые подписываетесь наBehaviorSubject
он будет синхронно отправлять предыдущее значение, которое он получил или был инициализирован. - Используя
ReplaySubject(N)
: Это будет кешN
значения и воспроизвести их для новых подписчиков. A.withLatestFrom(B)
: Используйте этот оператор, чтобы получить самое последнее значение из наблюдаемогоB
когда наблюдаемыйA
излучает. Даст вам оба значения в массиве[a, b]
,A.combineLatest(B)
: Используйте этот оператор, чтобы получить самые последние значения изA
а такжеB
каждый раз либоA
или жеB
излучает. Даст вам оба значения в массиве.shareReplay()
: Делает наблюдаемую многоадресную рассылку черезReplaySubject
, но позволяет повторить наблюдаемое при ошибке. (В основном это дает вам обещанное поведение кеширования).publishReplay()
,publishBehavior(initialValue)
,multicast(subject: BehaviorSubject | ReplaySubject)
и т. д. Другие операторы, которые используютBehaviorSubject
а такжеReplaySubject
, Различные разновидности одной и той же вещи, они в основном направляют источник, наблюдаемый путем направления всех уведомлений через тему. Вам нужно позвонитьconnect()
подписаться на источник с темой.
У меня была похожая ситуация, когда поздние подписчики подписывались на Предмет после того, как его ценность прибыла.
Я обнаружил, что ReplaySubject, который похож на BehaviorSubject, работает как шарм в этом случае. А вот ссылка на лучшее объяснение: http://reactivex.io/rxjs/manual/overview.html
const observable = of('response')
function hasValue(value: any) {
return value !== null && value !== undefined;
}
function getValue<T>(observable: Observable<T>): Promise<T> {
return observable
.pipe(
filter(hasValue),
first()
)
.toPromise();
}
const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................
Вы можете проверить полную статью о том, как реализовать это здесь. https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/
Аналогичный ответ был отклонен. Но я думаю, что могу оправдать то, что я предлагаю здесь для ограниченных случаев.
Хотя верно, что наблюдаемая не имеет текущего значения, очень часто она будет иметь сразу доступное значение. Например, в хранилищах redux / flux / akita вы можете запрашивать данные из центрального хранилища, основываясь на количестве наблюдаемых, и это значение обычно будет сразу же доступно.
Если это так, то когда вы subscribe
, значение вернется немедленно.
Допустим, у вас был вызов в службу, и по окончании вы хотите получить из вашего магазина самую свежую ценность, которая потенциально может не генерироваться:
Вы можете попытаться сделать это (и вы должны как можно больше держать вещи "в трубах"):
serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
.subscribe(([ serviceCallResponse, customer] => {
// we have serviceCallResponse and customer
});
Проблема в том, что он будет блокироваться до тех пор, пока вторичная наблюдаемая не выдаст значение, которое потенциально может никогда не быть.
Недавно я обнаружил, что мне нужно оценивать наблюдаемое только в том случае, если значение было немедленно доступно, и, что более важно, мне нужно было уметь определять, не было ли оно. Я закончил тем, что сделал это:
serviceCallResponse$.pipe()
.subscribe(serviceCallResponse => {
// immediately try to subscribe to get the 'available' value
// note: immediately unsubscribe afterward to 'cancel' if needed
let customer = undefined;
// whatever the secondary observable is
const secondary$ = store$.select(x => x.customer);
// subscribe to it, and assign to closure scope
sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
sub.unsubscribe();
// if there's a delay or customer isn't available the value won't have been set before we get here
if (customer === undefined)
{
// handle, or ignore as needed
return throwError('Customer was not immediately available');
}
});
Обратите внимание, что для всего вышеперечисленного я использую subscribe
чтобы получить значение (как обсуждает @Ben). Не используя .value
собственность, даже если бы у меня был BehaviorSubject
,
Я сталкивался с той же проблемой в дочерних компонентах, где первоначально он должен был бы иметь текущее значение Subject, а затем подписаться на Subject для прослушивания изменений. Я просто поддерживаю текущее значение в Сервисе, чтобы оно было доступно для компонентов, например:
import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject} from 'rxjs/Subject';
@Injectable()
export class SessionStorage extends Storage {
isLoggedIn: boolean;
private _isLoggedInSource = new Subject<boolean>();
isLoggedIn = this._isLoggedInSource.asObservable();
constructor() {
super('session');
this.currIsLoggedIn = false;
}
setIsLoggedIn(value: boolean) {
this.setItem('_isLoggedIn', value, () => {
this._isLoggedInSource.next(value);
});
this.isLoggedIn = value;
}
}
Компонент, которому нужно текущее значение, может просто получить к нему доступ из службы, то есть:
sessionStorage.isLoggedIn
Не уверен, что это правильная практика:)
Подписку можно создать и после взятия первого отправленного элемента уничтожить. Pipe - это функция, которая использует Observable в качестве входных данных и возвращает другой Observable в качестве выходных данных, не изменяя при этом первую наблюдаемую. Угловой 8.1.0. Пакеты:"rxjs": "6.5.3"
, "rxjs-observable": "0.0.7"
ngOnInit() {
...
// If loading with previously saved value
if (this.controlValue) {
// Take says once you have 1, then close the subscription
this.selectList.pipe(take(1)).subscribe(x => {
let opt = x.find(y => y.value === this.controlValue);
this.updateValue(opt);
});
}
}
Хотя это может показаться излишним, это всего лишь еще одно "возможное" решение, позволяющее сохранить тип Observable и уменьшить количество шаблонов...
Вы всегда можете создать расширение для получения текущего значения Observable.
Для этого вам необходимо продлить Observable<T>
интерфейс в global.d.ts
файл объявления печатных машин. Затем реализовать расширение геттер в observable.extension.ts
файл и, наконец, включить в ваше приложение как набор текста, так и файл расширения.
Вы можете обратиться к этому ответу Stackru, чтобы узнать, как включить расширения в ваше приложение Angular.
// global.d.ts
declare module 'rxjs' {
interface Observable<T> {
/**
* _Extension Method_ - Returns current value of an Observable.
* Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
*/
value: Observable<T>;
}
}
// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
get <T>(this: Observable<T>): Observable<T> {
return this.pipe(
filter(value => value !== null && value !== undefined),
first());
},
});
// using the extension getter example
this.myObservable$.value
.subscribe(value => {
// whatever code you need...
});
Этого можно добиться двумя способами.
BehaviorSubject имеет метод
getValue()
которое вы можете получить в определенный момент времени.Вы можете подписаться напрямую с помощью BehaviorSubject и передать подписанное значение члену класса, полю или свойству.
Я бы не рекомендовал оба подхода.
В первом подходе это удобный метод, вы можете получить значение в любое время, вы можете называть его текущим снимком на тот момент времени. Проблема в том, что вы можете ввести в свой код условия гонки, вы можете вызывать этот метод во многих разных местах и в разное время, что трудно отлаживать.
Второй подход - это то, что используют большинство разработчиков, когда они хотят получить необработанное значение при подписке, вы можете отслеживать подписку, и когда вы точно отказываетесь от подписки, чтобы избежать дальнейшей утечки памяти, вы можете использовать это, если действительно отчаянно хотите привязать его к переменной. и нет других способов связать это.
Я бы порекомендовал, еще раз взглянув на ваши варианты использования, где вы его используете? Например, вы хотите определить, вошел ли пользователь в систему или нет, когда вы вызываете какой-либо API, вы можете объединить его с другими наблюдаемыми:
const data$ = apiRequestCall$().pipe(
// Latest snapshot from BehaviorSubject.
withLatestFrom(isLoggedIn),
// Allow only call if loggedin.
filter(([request, loggedIn) => loggedIn)
// Do something else..
)
При этом вы можете использовать его непосредственно в пользовательском интерфейсе, подключив
data$ | async
в случае углового.
Другой подход. Если вы хотите/можете использовать асинхронное ожидание (должно быть внутри асинхронных функций), вы можете сделать это с помощью современных Rxjs:
async myFunction () {
const currentValue = await firstValueFrom(
of(0).pipe(
withLatestFrom(this.yourObservable$),
map((tuple) => tuple[1]),
take(1)
)
);
// do stuff with current value
}
Это выдаст значение «Сразу же» из-заwithLatestFrom
, а затем разрешит обещание.
Наблюдаемые — это просто функции. Они возвращают поток значений, но чтобы увидеть эти значения, вам необходимо на них подписаться. Самый простой способ — позвонитьsubscribe()
метод для наблюдаемого и передать наблюдателя илиnext()
обратный вызов в качестве аргумента.
В качестве альтернативы вы можете использовать подписчика, который является реализацией Observer, но предоставляет вам дополнительные возможности, такие какunsubscribe
.
import { Subscription } from 'rxjs';
export class SessionStorage extends Storage {
private _isLoggedInSource = new Subject<boolean>();
privat _subs = new Subscription();
isLoggedIn$ = this._isLoggedInSource.asObservable();
isLoggedIn = false;
constructor() {
super('session');
this._subs.add(this.isLoggedIn$.subscribe((value) => this.isLoggedIn = v))
}
setIsLoggedIn(value: boolean) {
this.setItem('_isLoggedIn', value, () => {
this._isLoggedInSource.next(value);
});
}
}
Затем в другом компоненте вы можете импортироватьSessionStorage
и получить доступ к значениюisLoggedIn
.
ПРИМЕЧАНИЕ . Я бы использовалnew Observable()
конструктор для инкапсуляции данных_isLoggedInSource
внутри наблюдаемого.
Вы можете хранить последнее переданное значение отдельно от Observable. Тогда прочитайте это когда необходимо.
let lastValue: number;
const subscription = new Service().start();
subscription
.subscribe((data) => {
lastValue = data;
}
);
Лучший способ сделать это - использовать Behaviur Subject
, вот пример:
var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5