python aiokafka многие потребители многим производителям

Я использую aiokafka для потребления, фильтрации полей сообщений и отправки сообщений обратно в kafka. Я запускаю 4 асинхронных потребителя, которые помещают сообщения в асинхронную очередь. Затем один процесс использует эту очередь и создает асинхронную output_queue. Несколько продуктов потребляют из асинхронной очереди output_queue и отправляют обратно в kafka.

Я хотел добиться решения, поэтому у меня было бы:

МНОГИЕ потребители >> процессор >> МНОГИЕ производители.

Я хотел бы сначала решить проблему с потребителями/производителями, прежде чем сосредоточиться на процессоре.

Проблема, с которой я сталкиваюсь, заключается в том, что код создает медленно, например, 50 сообщений в секунду. У меня есть поток из 100 тысяч сообщений, поэтому у меня должна быть ошибка в коде.

Как я могу это исправить?

      import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json

BROKERS = [
    "BROKER0:PORT",
    "BROKER1:PORT",
    "BROKER2:PORT",
]

GROUP_ID = "group_id"
TOPIC_INPUT = "topic_input"
TOPIC_OUTPUT = "topic_output"


async def consume(queue):
    consumer = AIOKafkaConsumer(
        TOPIC_INPUT,
        bootstrap_servers=BROKERS,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        group_id=GROUP_ID,
        auto_offset_reset="latest"
    )
    await consumer.start()
    try:
        async for message in consumer:
            processed_message = {
                "timestamp": message.timestamp,
                "col1": message.value["col1"],
                "col2": message.value["col2"],
                "col3": message.value["col3"],
            }
            await queue.put(processed_message)
    finally:
        await consumer.stop()


async def process_message(message):
    print(message)
    return message


async def process_messages(queue, output_queue):
    while True:
        message = await queue.get()
        processed_message = await process_message(message)
        await output_queue.put(processed_message)
        queue.task_done()


# async def produce(output_queue):
#     producer = AIOKafkaProducer(
#         bootstrap_servers=BROKERS,
#         value_serializer=lambda m: json.dumps(m).encode('utf-8')
#     )
#     await producer.start()
#     try:
#         while True:
#             message = await output_queue.get()
#             print(message)
#             await producer.send_and_wait(TOPIC_OUTPUT, message)
#             output_queue.task_done()
#     finally:
#         await producer.stop()


async def main():
    queue = asyncio.Queue(maxsize=1000000)
    output_queue = asyncio.Queue(maxsize=1000000)

    consumers = [asyncio.create_task(consume(queue)) for i in range(4)]
    # producers = [asyncio.create_task(produce(output_queue)) for i in range(3)]
    process_task = asyncio.create_task(process_messages(queue, output_queue))

    # await asyncio.gather(*consumers, *producers, process_task)
    await asyncio.gather(*consumers, process_task)



if __name__ == '__main__':
    asyncio.run(main())

0 ответов

Другие вопросы по тегам