Aiokafka обрабатывает события потребителей параллельно

Я только начинаю работать с kafka, у меня есть кластер k8s, в котором я хочу развернуть прослушиватели событий. Когда у меня работает один слушатель, все работает нормально, но с несколькими подами они обрабатывают события параллельно, хотелось бы, чтобы событие обрабатывалось только один раз. Как я могу этого добиться?

Мой код слушателя:

      import asyncio, settings, json
from aiokafka import AIOKafkaConsumer


event_handler = {
    "table_create": table_create_event,
    "table_delete": table_delete_event,
}

consumer_config = [
    {
        "name": "main consumer1",
        "topics": ["storage_create", "storage_update", "storage_delete",
                   "table_create", "table_update", "table_delete",
                   "field_create", "field_update", "field_delete",
                   "value_create", "value_update", "value_delete"],
        "group_id": "cms_events"
    },
    {
        "name": "main consume2r",
        "topics": ["storage_create", "storage_update", "storage_delete",
                   "table_create", "table_update", "table_delete",
                   "field_create", "field_update", "field_delete",
                   "value_create", "value_update", "value_delete"],
        "group_id": "cms_events"
    }
]


async def consume(topics, group_id):
    consumer = AIOKafkaConsumer(
        *topics,
        bootstrap_servers='localhost:9092',
        group_id=group_id,
        auto_offset_reset="earliest",
        metadata_max_age_ms=30000,
    )
    await consumer.start()
    try:
        async for msg in consumer:
            print(
                "{}:{:d}:{:d}: key={} value={} timestamp_ms={}".format(
                    msg.topic, msg.partition, msg.offset, msg.key, msg.value,
                    msg.timestamp)
            )
            topic = msg.topic
            encode_event_body = msg.value
            decode_event_body = json.loads(encode_event_body)
            try:
                await event_handler[topic](decode_event_body)
            except Exception as exc:
                print(exc)
    finally:
        await consumer.stop()


async def main():
    await asyncio.gather(*[
            consume(topics=consumer.get("topics"), group_id=consumer.get("group_id"))
            for consumer in consumer_config
        ]
    )


if __name__ == "__main__":
    asyncio.run(main())

0 ответов

Другие вопросы по тегам