Как я могу "ждать" на Rx Observable?

Я хотел бы иметь возможность ждать на наблюдаемой, например,

const source = Rx.Observable.create(/* ... */)
//...
await source;

Наивная попытка приводит к тому, что ожидание разрешается немедленно и не блокирует выполнение

Изменить: псевдокод для моего полного предполагаемого использования:

if (condition) {
  await observable;
}
// a bunch of other code

Я понимаю, что могу переместить другой код в другую отдельную функцию и передать его в обратный вызов подписки, но я надеюсь, что смогу избежать этого.

7 ответов

Решение

Вы должны передать обещание await, Преобразуйте следующее событие наблюдаемого в обещание и ждите этого.

if (condition) {
  await observable.first().toPromise();
}

Редактировать примечание: в этом ответе изначально использовался.take(1), но он был изменен на использование.first(), что позволяет избежать проблемы Promise, которая никогда не будет решена, если поток заканчивается до того, как будет получено значение.

Используйте новый firstValueFrom() или lastValueFrom() вместо того toPromise(), который, как указано здесь, устарел, начиная с RxJS 7, и будет удален в RxJS 8.

import { firstValueFrom} from 'rxjs';
import { lastValueFrom } from 'rxjs';

this.myProp = await firstValueFrom(myObservable$);
this.myProp = await lastValueFrom(myObservable$);

Это доступно в RxJS 7+

См.: https://indepth.dev/rxjs-heads-up-topromise-is-being-deprecated/

Должно быть

await observable.first().toPromise();

Как было отмечено в комментариях ранее, существует существенная разница между take(1) а также first() операторы, когда есть пустые законченные наблюдаемые.

Observable.empty().first().toPromise() приведет к отклонению с EmptyError с этим можно обращаться соответственно, что обычно является желательным поведением.

А также Observable.empty().take(1).toPromise() приведет к ожидающему обещанию, которое желательно... почти никогда.

Если toPromise устарел для вас, вы можете использовать .pipe(take(1)).toPromiseно как вы можете видеть здесь, это не рекомендуется.

Поэтому, пожалуйста, используйте toPromise (RxJs 6) как сказано:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = sample('First Example')
  .toPromise()
  //output: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

Пример async/await:

//return basic observable
const sample = val => Rx.Observable.of(val).delay(5000);
//convert basic observable to promise
const example = await sample('First Example').toPromise()
// output: 'First Example'
console.log('From Promise:', result);

Подробнее читайте здесь.

И удалите, пожалуйста, это неправильное заявление, в котором говорится toPromise устарела.

Вам нужно будет await обещание, так что вы захотите использовать toPromise(), Смотрите это для более подробной информации о toPromise(),

С использованием toPromise()не рекомендуется, так как в RxJs 7 и далее он обесценивается. Вы можете использовать два новых оператора, присутствующих в RxJs 7 lastValueFrom() и firstValueFrom(). Более подробную информацию можно найти здесь

      const result = await lastValueFrom(myObservable$);

Реализации в бета-версии доступны здесь:

Я использую RxJS V 6.4.0, поэтому я должен использовать устаревший в V 7.xx. Вдохновленный другими ответами, вот что я сделал с toPromise()

      import { first, ... } from 'rxjs/operators';

...

if (condition) {
  await observable$.pipe(first()).toPromise();
}

...

Обратите внимание, как я использовал last()внутри pipe(). Потому что на моем observable.first()не работает так же, как упомянуто user1289657

Надеюсь, это поможет другим, кто использует RxJS V 6.xx, как и я :).

Спасибо.

Другие вопросы по тегам