Построение двойного бесконечного опроса в RX

Проблема состоит в том, чтобы эмулировать поведение двойной петли с RX:

while True:
    try:
        token = get_token()
        while True:
            try:
                value = get_value_using_token(token)
                do_something(value)
            except:
                break
    except:
        break

Было бы чисто, если бы две петли были заменены двумя наблюдаемыми, одна из которых выступает в качестве наблюдателя внешней, а do_something(value) может быть заменен наблюдателем самостоятельно. Любые исключения также могут быть обработаны. Внешний цикл должен быть блокирующим, но внутренний цикл может и не быть, так как я пытаюсь использовать внешний цикл для обработки исключений, используя функцию повтора с функцией отката.

Пока что я могу построить последовательность, используя:

Observable.from_iterable(value for value in iter(get_token, None))
    .subscribe(do_something)

но как я могу сделать аналогичную структуру в режиме блокировки для внешнего?

2 ответа

Вам просто нужно использовать Repeat Оператор для создания цикла. И тогда вам нужно Retry Оператор продолжить в случае сбоя.

Что-то вроде

Observable.return(get_token())
    .flatMap(token->Observable.return(get_value_using_token(token))
        .repeat())
    .retry()
.subscribe(do_something)

* Я не знаю Python, поэтому я надеюсь, что вы можете конвертировать этот код PSEDO

В итоге я создал бесконечный поток функций, используя repeat оператор и map на его вызов.

def get_token():
    return some_value

def get_value_with_token(token):
    return some_value_using_token

Observable.repeat(get_token)\
    .map(lambda f: f())\
    .map(lambda n: O.repeat(lambda: get_value_with_token(n)))\
    .concat_all()\
    .map(lambda f: f())\
    .subscribe(logger.info)

где get_token а также get_value_with_token являются функциями.

Используя функции блокировки для обоих, я могу сделать двойной цикл и применить дополнительные rx такие операторы, как retry к наблюдаемому.

Другие вопросы по тегам