RXPY вводить предметы в наблюдаемую

Этот вопрос касается rxpy.

Я пытаюсь построить реактивную систему, которая обрабатывает сообщения от наблюдаемого источника. В дополнение к этому я пытаюсь интегрировать это с системой выборов лидера, основанной на zookeeper.

Эта комбинация позволит только одному лидеру в ферме процессов обрабатывать поток сообщений. Ниже суть кода, который я пытаюсь создать.

# event_source is an observable of messages
# manager.leaders is an observable of leader election events
# manager.followers is an observable of leader relinquish events
event_source\
    .skip_until(manager.leaders)\
    .take_until(manager.followers)\
    .subscribe(observer)

Работает нормально и все, но мне нужно вводить между skip_until а также take_until кусок для обработки засыпки. Это предназначено для устранения потенциального разрыва между сбоем процесса лидера и другим процессом, предполагающим лидерство. Каждое обработанное сообщение оставит запись, так что новый руководитель сможет наверстать упущенное, если оно есть, перед тем, как продолжить работу с потоком.

Я старался start_with Оператор без успеха. Разве я не подхожу к нему таким образом, для которого он не предназначен?

В конечном счете, решение, которое я ищу, состоит в том, чтобы добавить определенное количество элементов в поток, вызванный событием из другого потока.

1 ответ

Как насчет этого:

manager.leaders \
    .flat_map(lambda e: event_source
                  .start_with(...)
                  .take_until(manager.followers))

Каждый раз manager.leaders испускает сообщение event_source будет подписан на, начиная с введенных предметов, до manager.followers излучает.

Другие вопросы по тегам