RXPY вводить предметы в наблюдаемую
Этот вопрос касается rxpy.
Я пытаюсь построить реактивную систему, которая обрабатывает сообщения от наблюдаемого источника. В дополнение к этому я пытаюсь интегрировать это с системой выборов лидера, основанной на zookeeper.
Эта комбинация позволит только одному лидеру в ферме процессов обрабатывать поток сообщений. Ниже суть кода, который я пытаюсь создать.
# event_source is an observable of messages
# manager.leaders is an observable of leader election events
# manager.followers is an observable of leader relinquish events
event_source\
.skip_until(manager.leaders)\
.take_until(manager.followers)\
.subscribe(observer)
Работает нормально и все, но мне нужно вводить между skip_until
а также take_until
кусок для обработки засыпки. Это предназначено для устранения потенциального разрыва между сбоем процесса лидера и другим процессом, предполагающим лидерство. Каждое обработанное сообщение оставит запись, так что новый руководитель сможет наверстать упущенное, если оно есть, перед тем, как продолжить работу с потоком.
Я старался start_with
Оператор без успеха. Разве я не подхожу к нему таким образом, для которого он не предназначен?
В конечном счете, решение, которое я ищу, состоит в том, чтобы добавить определенное количество элементов в поток, вызванный событием из другого потока.
1 ответ
Как насчет этого:
manager.leaders \
.flat_map(lambda e: event_source
.start_with(...)
.take_until(manager.followers))
Каждый раз manager.leaders
испускает сообщение event_source
будет подписан на, начиная с введенных предметов, до manager.followers
излучает.