RX поток запуска и остановки событий с поздней подпиской

Я пытаюсь сделать тему запуска и остановки событий, когда поздние подписчики получают только выдающиеся стартовые события. то есть. те, у которых не было соответствующего события остановки.

Вот некоторый код RxPY:

from rx.subjects import ReplaySubject

start = ReplaySubject()

start.subscribe(lambda x: print("subscriber1: " + str(x)))

start.on_next(("a", "start"))
start.on_next(("b", "start"))
start.on_next(("b", "stop"))

start.subscribe(lambda x: print("subscriber2: " + str(x)))

start.on_next(("c", "start"))

Это дает вывод:

subscriber1: ('a', 'start')
subscriber1: ('b', 'start')
subscriber1: ('b', 'stop')
subscriber2: ('a', 'start')
subscriber2: ('b', 'start')
subscriber2: ('b', 'stop')
subscriber1: ('c', 'start')
subscriber2: ('c', 'start')

Принимая во внимание, что я хотел бы:

subscriber1: ('a', 'start')
subscriber1: ('b', 'start')
subscriber1: ('b', 'stop')
subscriber2: ('a', 'start')
subscriber1: ('c', 'start')
subscriber2: ('c', 'start')

Я думаю, что требуется что-то вроде оператора сканирования, но я не могу его собрать. Любые идеи с благодарностью получили:)

1 ответ

Самое чистое решение - использовать побочные эффекты вне основного потока, чтобы обновить словарь и объединить незавершенные события с новыми подписчиками.

class EventObserver(Observer):
  def __init__(self):
    self.cached_events = set()
    self.mirror = Subject() # re-emits all values

  on_next(self, value):
    self.mirror.next(value) # stream to late observers
    if(value[1] == 'stop'):
      try:
        self.cached_events.remove(value[0])
      except KeyError:
        pass
    else:
      self.cached_events.add(value[0])

  on_error(self, e):
    self.mirror.error(e) # + other error logic

  on_completed(self):
    self.mirror.complete() # + other completion logic

  late_subscribe(self, subscriber):
    return Observable.merge(
      Observable.from(list(self.cached_events)),
      self.mirror
    ).subscribe(subscriber)

Используется следующим образом:

event_observer = EventObserver()
events$.subscribe(event_observer)

# late subscription:
event_observer.late_subscribe(...)

Остальная часть ответа объясняет, почему вы, вероятно, предпочтете это, а не реактивный подход.


Реактивный подход:

Вот самое простое решение, которое я мог бы придумать, если вы не возражаете против ваших поздних подписчиков, ожидающих следующего события. Как видите, это не самая красивая.

pub_events$ = events$.publish(); # in case your events$ aren't hot
replay_events$ = pub_events$.replay();

# late subscription:
replay_events$.window(events$.take(1))
              .scan(lambda is_first, o: 
                      o.reduce(lambda D, x: D.update({ x[0]: x[1] == 'stop' }) or D, {})
                       .flatMap(lambda D: Observable.from([ k for k, v in D.items() if v == False ]))
                      if is_first == True else o,
                    True)
              .flatMap(lambda o: o)

Цель - запустить позднюю подписку с отфильтрованным списком незавершенных событий, созданным из кеша всех предыдущих событий. Самый большой барьер в том, что ReplaySubject не отличает эти кэшированные события от новых. Первый шаг к решению этого - window на следующее событие, ожидая ReplaySubject испустить кэшированные события до этого. Поскольку ваше требование звучит скорее как оптимизация, а не как корректность, условия гонки здесь могут не иметь большого значения.

Существует не более двух окон: одно из кэшированных событий и одно из новых событий (если они есть), поэтому scan немного использует слабость типа Python, чтобы проверить, в каком окне мы находимся. Если это кэшированные события, мы создаем словарь ключей событий → независимо от того, было ли это событие "остановлено". Последний шаг заключается в том, чтобы вставить неостановленные значения обратно в поток с помощью flatMap,

Другие вопросы по тегам