Два агента с разными фильтрами на одну тему кафки. Признание в Faust Stream
Я хочу, чтобы два агента faust слушали одну и ту же тему kafka, но каждый агент использует свой собственный фильтр перед обработкой событий, и их наборы событий не пересекаются.
В документации есть пример:https://faust.readthedocs.io/en/latest/userguide/streams.html
Если два агента используют потоки, подписанные на одну и ту же тему:
topic = app.topic('orders') @app.agent(topic) async def processA(stream): async for value in stream: print(f'A: {value}') @app.agent(topic) async def processB(stream): async for value in stream: print(f'B: {value}')
Conductor будет пересылать каждое сообщение, полученное по теме "заказы", обоим агентам, увеличивая счетчик ссылок всякий раз, когда оно входит в поток агентов.
Счетчик ссылок уменьшается, когда событие подтверждается, и когда оно достигает нуля, потребитель будет рассматривать это смещение как "выполненное" и может его зафиксировать.
И ниже для фильтров https://faust.readthedocs.io/en/latest/userguide/streams.html:
@app.agent() async def process(stream): async for value in stream.filter(lambda: v > 1000).group_by(...): ...
Я использую какой-то сложный фильтр, но в результате делю поток на две части для двух агентов с совершенно разной логикой. (Я не использую group_by)
Если два агента работают вместе, все в порядке. Но если я остановлю их и перезапущу, каждый будет обрабатывать поток с самого начала. Потому что каждое событие не было подтверждено ни одним из агентов. Если я подтверждаю все события в каждом агенте, когда один из агентов не запускается, второй очищает тему. (Если один будет раздавлен и перезапущен, проводник увидит трех абонентов, так как он 20 минут ждет ответа раздавленного агента).
Я просто хочу разделить события на две части. Как мне сделать соответствующую синхронизацию в этом случае?
1 ответ
в
faust
у фильтрации есть некоторые ошибки, когда дело доходит до подтверждения отфильтрованных событий. Я предлагаю не использовать
fault.filter()
особенность, но простая
if...then...else
стиль операторов при потреблении из потока, аналогичный приведенному ниже:
@app.agent(topic)
async def process(stream):
async for event in stream:
if event.amount >= 300.0:
yield event