Aiokafka обрабатывает события потребителей параллельно
Я только начинаю работать с kafka, у меня есть кластер k8s, в котором я хочу развернуть прослушиватели событий. Когда у меня работает один слушатель, все работает нормально, но с несколькими подами они обрабатывают события параллельно, хотелось бы, чтобы событие обрабатывалось только один раз. Как я могу этого добиться?
Мой код слушателя:
import asyncio, settings, json
from aiokafka import AIOKafkaConsumer
event_handler = {
"table_create": table_create_event,
"table_delete": table_delete_event,
}
consumer_config = [
{
"name": "main consumer1",
"topics": ["storage_create", "storage_update", "storage_delete",
"table_create", "table_update", "table_delete",
"field_create", "field_update", "field_delete",
"value_create", "value_update", "value_delete"],
"group_id": "cms_events"
},
{
"name": "main consume2r",
"topics": ["storage_create", "storage_update", "storage_delete",
"table_create", "table_update", "table_delete",
"field_create", "field_update", "field_delete",
"value_create", "value_update", "value_delete"],
"group_id": "cms_events"
}
]
async def consume(topics, group_id):
consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers='localhost:9092',
group_id=group_id,
auto_offset_reset="earliest",
metadata_max_age_ms=30000,
)
await consumer.start()
try:
async for msg in consumer:
print(
"{}:{:d}:{:d}: key={} value={} timestamp_ms={}".format(
msg.topic, msg.partition, msg.offset, msg.key, msg.value,
msg.timestamp)
)
topic = msg.topic
encode_event_body = msg.value
decode_event_body = json.loads(encode_event_body)
try:
await event_handler[topic](decode_event_body)
except Exception as exc:
print(exc)
finally:
await consumer.stop()
async def main():
await asyncio.gather(*[
consume(topics=consumer.get("topics"), group_id=consumer.get("group_id"))
for consumer in consumer_config
]
)
if __name__ == "__main__":
asyncio.run(main())