Вернуть значение из наблюдаемого через rxpy

Каков элегантный способ преобразования объекта rx.Observable в "нормальный" объект в функции?

например:

def foo():
    return rx.Observable.just('value').subscribe(<some magic here>)

>>> print(foo())

# expected:
# value
# however get:
# <rx.disposables.anonymousdisposable.AnonymousDisposable at SOME ADDRESS>

Я понимаю, что возврат подписки является одноразовым объектом, и "ужасный" способ реализовать это:

class Foo:
    def __init__(self):
        self.buffer = None

    def call_kernel(self):
        rx.Observable.just('value').subscribe(lambda v: self.buffer = v)

    def __call__(self):
        self.call_kernel()
        return self.buffer
>>> Foo()

# get:
# value

Есть ли лучший способ сделать это?

Благодарю.

1 ответ

Решение

Взгляни на Observable::to_blocking(): это создает BlockingObservable который принудительно вносится в список, содержащий все испускаемые элементы. Для вашего примера:

def foo():
  return list(rx.Observable.just('value').to_blocking())[0]

Я также хотел бы отметить, что ваше второе решение опасно, потому что нет точной гарантии того, когда будут получены значения, а ваше __call__ опирается на 'value' испустить немедленно.

Другие вопросы по тегам