RX поток запуска и остановки событий с поздней подпиской
Я пытаюсь сделать тему запуска и остановки событий, когда поздние подписчики получают только выдающиеся стартовые события. то есть. те, у которых не было соответствующего события остановки.
Вот некоторый код RxPY:
from rx.subjects import ReplaySubject
start = ReplaySubject()
start.subscribe(lambda x: print("subscriber1: " + str(x)))
start.on_next(("a", "start"))
start.on_next(("b", "start"))
start.on_next(("b", "stop"))
start.subscribe(lambda x: print("subscriber2: " + str(x)))
start.on_next(("c", "start"))
Это дает вывод:
subscriber1: ('a', 'start')
subscriber1: ('b', 'start')
subscriber1: ('b', 'stop')
subscriber2: ('a', 'start')
subscriber2: ('b', 'start')
subscriber2: ('b', 'stop')
subscriber1: ('c', 'start')
subscriber2: ('c', 'start')
Принимая во внимание, что я хотел бы:
subscriber1: ('a', 'start')
subscriber1: ('b', 'start')
subscriber1: ('b', 'stop')
subscriber2: ('a', 'start')
subscriber1: ('c', 'start')
subscriber2: ('c', 'start')
Я думаю, что требуется что-то вроде оператора сканирования, но я не могу его собрать. Любые идеи с благодарностью получили:)
1 ответ
Самое чистое решение - использовать побочные эффекты вне основного потока, чтобы обновить словарь и объединить незавершенные события с новыми подписчиками.
class EventObserver(Observer):
def __init__(self):
self.cached_events = set()
self.mirror = Subject() # re-emits all values
on_next(self, value):
self.mirror.next(value) # stream to late observers
if(value[1] == 'stop'):
try:
self.cached_events.remove(value[0])
except KeyError:
pass
else:
self.cached_events.add(value[0])
on_error(self, e):
self.mirror.error(e) # + other error logic
on_completed(self):
self.mirror.complete() # + other completion logic
late_subscribe(self, subscriber):
return Observable.merge(
Observable.from(list(self.cached_events)),
self.mirror
).subscribe(subscriber)
Используется следующим образом:
event_observer = EventObserver()
events$.subscribe(event_observer)
# late subscription:
event_observer.late_subscribe(...)
Остальная часть ответа объясняет, почему вы, вероятно, предпочтете это, а не реактивный подход.
Реактивный подход:
Вот самое простое решение, которое я мог бы придумать, если вы не возражаете против ваших поздних подписчиков, ожидающих следующего события. Как видите, это не самая красивая.
pub_events$ = events$.publish(); # in case your events$ aren't hot
replay_events$ = pub_events$.replay();
# late subscription:
replay_events$.window(events$.take(1))
.scan(lambda is_first, o:
o.reduce(lambda D, x: D.update({ x[0]: x[1] == 'stop' }) or D, {})
.flatMap(lambda D: Observable.from([ k for k, v in D.items() if v == False ]))
if is_first == True else o,
True)
.flatMap(lambda o: o)
Цель - запустить позднюю подписку с отфильтрованным списком незавершенных событий, созданным из кеша всех предыдущих событий. Самый большой барьер в том, что ReplaySubject
не отличает эти кэшированные события от новых. Первый шаг к решению этого - window
на следующее событие, ожидая ReplaySubject
испустить кэшированные события до этого. Поскольку ваше требование звучит скорее как оптимизация, а не как корректность, условия гонки здесь могут не иметь большого значения.
Существует не более двух окон: одно из кэшированных событий и одно из новых событий (если они есть), поэтому scan
немного использует слабость типа Python, чтобы проверить, в каком окне мы находимся. Если это кэшированные события, мы создаем словарь ключей событий → независимо от того, было ли это событие "остановлено". Последний шаг заключается в том, чтобы вставить неостановленные значения обратно в поток с помощью flatMap
,