Два агента с разными фильтрами на одну тему кафки. Признание в Faust Stream

Я хочу, чтобы два агента faust слушали одну и ту же тему kafka, но каждый агент использует свой собственный фильтр перед обработкой событий, и их наборы событий не пересекаются.

В документации есть пример:https://faust.readthedocs.io/en/latest/userguide/streams.html

Если два агента используют потоки, подписанные на одну и ту же тему:

 topic = app.topic('orders')

 @app.agent(topic)
 async def processA(stream):
      async for value in stream:
          print(f'A: {value}')

 @app.agent(topic)
  async def processB(stream):
       async for value in stream:
           print(f'B: {value}')

Conductor будет пересылать каждое сообщение, полученное по теме "заказы", ​​обоим агентам, увеличивая счетчик ссылок всякий раз, когда оно входит в поток агентов.

Счетчик ссылок уменьшается, когда событие подтверждается, и когда оно достигает нуля, потребитель будет рассматривать это смещение как "выполненное" и может его зафиксировать.

И ниже для фильтров https://faust.readthedocs.io/en/latest/userguide/streams.html:

@app.agent() async def process(stream):
    async for value in stream.filter(lambda: v > 1000).group_by(...):
        ...

Я использую какой-то сложный фильтр, но в результате делю поток на две части для двух агентов с совершенно разной логикой. (Я не использую group_by)

Если два агента работают вместе, все в порядке. Но если я остановлю их и перезапущу, каждый будет обрабатывать поток с самого начала. Потому что каждое событие не было подтверждено ни одним из агентов. Если я подтверждаю все события в каждом агенте, когда один из агентов не запускается, второй очищает тему. (Если один будет раздавлен и перезапущен, проводник увидит трех абонентов, так как он 20 минут ждет ответа раздавленного агента).

Я просто хочу разделить события на две части. Как мне сделать соответствующую синхронизацию в этом случае?

1 ответ

в faustу фильтрации есть некоторые ошибки, когда дело доходит до подтверждения отфильтрованных событий. Я предлагаю не использовать fault.filter() особенность, но простая if...then...else стиль операторов при потреблении из потока, аналогичный приведенному ниже:

      @app.agent(topic)
async def process(stream):
    async for event in stream:
        if event.amount >= 300.0:
            yield event
Другие вопросы по тегам