Построение двойного бесконечного опроса в RX
Проблема состоит в том, чтобы эмулировать поведение двойной петли с RX:
while True:
try:
token = get_token()
while True:
try:
value = get_value_using_token(token)
do_something(value)
except:
break
except:
break
Было бы чисто, если бы две петли были заменены двумя наблюдаемыми, одна из которых выступает в качестве наблюдателя внешней, а do_something(value)
может быть заменен наблюдателем самостоятельно. Любые исключения также могут быть обработаны. Внешний цикл должен быть блокирующим, но внутренний цикл может и не быть, так как я пытаюсь использовать внешний цикл для обработки исключений, используя функцию повтора с функцией отката.
Пока что я могу построить последовательность, используя:
Observable.from_iterable(value for value in iter(get_token, None))
.subscribe(do_something)
но как я могу сделать аналогичную структуру в режиме блокировки для внешнего?
2 ответа
Вам просто нужно использовать Repeat
Оператор для создания цикла. И тогда вам нужно Retry
Оператор продолжить в случае сбоя.
Что-то вроде
Observable.return(get_token())
.flatMap(token->Observable.return(get_value_using_token(token))
.repeat())
.retry()
.subscribe(do_something)
* Я не знаю Python, поэтому я надеюсь, что вы можете конвертировать этот код PSEDO
В итоге я создал бесконечный поток функций, используя repeat
оператор и map
на его вызов.
def get_token():
return some_value
def get_value_with_token(token):
return some_value_using_token
Observable.repeat(get_token)\
.map(lambda f: f())\
.map(lambda n: O.repeat(lambda: get_value_with_token(n)))\
.concat_all()\
.map(lambda f: f())\
.subscribe(logger.info)
где get_token
а также get_value_with_token
являются функциями.
Используя функции блокировки для обоих, я могу сделать двойной цикл и применить дополнительные rx
такие операторы, как retry
к наблюдаемому.