python aiokafka многие потребители многим производителям
Я использую aiokafka для потребления, фильтрации полей сообщений и отправки сообщений обратно в kafka. Я запускаю 4 асинхронных потребителя, которые помещают сообщения в асинхронную очередь. Затем один процесс использует эту очередь и создает асинхронную output_queue. Несколько продуктов потребляют из асинхронной очереди output_queue и отправляют обратно в kafka.
Я хотел добиться решения, поэтому у меня было бы:
МНОГИЕ потребители >> процессор >> МНОГИЕ производители.
Я хотел бы сначала решить проблему с потребителями/производителями, прежде чем сосредоточиться на процессоре.
Проблема, с которой я сталкиваюсь, заключается в том, что код создает медленно, например, 50 сообщений в секунду. У меня есть поток из 100 тысяч сообщений, поэтому у меня должна быть ошибка в коде.
Как я могу это исправить?
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
BROKERS = [
"BROKER0:PORT",
"BROKER1:PORT",
"BROKER2:PORT",
]
GROUP_ID = "group_id"
TOPIC_INPUT = "topic_input"
TOPIC_OUTPUT = "topic_output"
async def consume(queue):
consumer = AIOKafkaConsumer(
TOPIC_INPUT,
bootstrap_servers=BROKERS,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id=GROUP_ID,
auto_offset_reset="latest"
)
await consumer.start()
try:
async for message in consumer:
processed_message = {
"timestamp": message.timestamp,
"col1": message.value["col1"],
"col2": message.value["col2"],
"col3": message.value["col3"],
}
await queue.put(processed_message)
finally:
await consumer.stop()
async def process_message(message):
print(message)
return message
async def process_messages(queue, output_queue):
while True:
message = await queue.get()
processed_message = await process_message(message)
await output_queue.put(processed_message)
queue.task_done()
# async def produce(output_queue):
# producer = AIOKafkaProducer(
# bootstrap_servers=BROKERS,
# value_serializer=lambda m: json.dumps(m).encode('utf-8')
# )
# await producer.start()
# try:
# while True:
# message = await output_queue.get()
# print(message)
# await producer.send_and_wait(TOPIC_OUTPUT, message)
# output_queue.task_done()
# finally:
# await producer.stop()
async def main():
queue = asyncio.Queue(maxsize=1000000)
output_queue = asyncio.Queue(maxsize=1000000)
consumers = [asyncio.create_task(consume(queue)) for i in range(4)]
# producers = [asyncio.create_task(produce(output_queue)) for i in range(3)]
process_task = asyncio.create_task(process_messages(queue, output_queue))
# await asyncio.gather(*consumers, *producers, process_task)
await asyncio.gather(*consumers, process_task)
if __name__ == '__main__':
asyncio.run(main())