Вернуть значение из наблюдаемого через rxpy
Каков элегантный способ преобразования объекта rx.Observable в "нормальный" объект в функции?
например:
def foo():
return rx.Observable.just('value').subscribe(<some magic here>)
>>> print(foo())
# expected:
# value
# however get:
# <rx.disposables.anonymousdisposable.AnonymousDisposable at SOME ADDRESS>
Я понимаю, что возврат подписки является одноразовым объектом, и "ужасный" способ реализовать это:
class Foo:
def __init__(self):
self.buffer = None
def call_kernel(self):
rx.Observable.just('value').subscribe(lambda v: self.buffer = v)
def __call__(self):
self.call_kernel()
return self.buffer
>>> Foo()
# get:
# value
Есть ли лучший способ сделать это?
Благодарю.
1 ответ
Решение
Взгляни на Observable::to_blocking()
: это создает BlockingObservable
который принудительно вносится в список, содержащий все испускаемые элементы. Для вашего примера:
def foo():
return list(rx.Observable.just('value').to_blocking())[0]
Я также хотел бы отметить, что ваше второе решение опасно, потому что нет точной гарантии того, когда будут получены значения, а ваше __call__
опирается на 'value'
испустить немедленно.