невозможно получить асинхронное сообщение от производителя и потребителя

Кафка, смотритель зоопарка, работает успешно

Это мой продюсер.py

      async def publish():
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092',
    enable_idempotence=True)  
    await producer.start()

    consumer = AIOKafkaConsumer(
    topicAKG,
    bootstrap_servers='localhost:9092',group_id='test',
    max_poll_interval_ms=60000,
    max_poll_records=50)
    await consumer.start()

    try:
        for i in range(1, 6):
            await producer.send_and_wait(topic, value='from producer'.encode())
            print(f"Iteration: {i}")
            async for message in consumer:
                print("Received ========== ", message.value.decode())
                await consumer.commit()
    finally:
        await producer.stop()
        await consumer.stop()

Это мой потребитель.py

импортировать asyncio из aiokafka, импортировать AIOKafkaConsumer, AIOKafkaProducer

      topic = 'app'
topicAKG = 'back'

async def consume():
    consumer = AIOKafkaConsumer(topic, bootstrap_servers='localhost:9092',
    group_id="test",
    max_poll_interval_ms=60000,
    max_poll_records=50)
    await consumer.start()

    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092',
                                enable_idempotence=True)
    await producer.start()

    try:
        async for message in consumer:
            print("Received",message.value.decode())
            await asyncio.sleep(2)  # Delay for 3 seconds
            await consumer.commit()  # Commit the offset to avoid re-consuming the same message
            await producer.send_and_wait(topicAKG, value='from consumer'.encode())
    finally:
        await consumer.stop()
        await producer.stop()

loop = asyncio.get_event_loop()
loop.run_until_complete(consume())

выход от производителя

Итерация:1

Получено ========== от потребителя

выпуск от потребителя

Получено от производителя

поэтому он застрял на итерации 1 и завис

1 ответ

используйте перерыв в Producer.out цикла после получения первого сообщения

       try:
        for i in range(1, 6):
            await producer.send_and_wait(topic, value='from producer'.encode())
            print(f"Iteration: {i}")
            async for message in consumer:
                print("Received ========== ", message.value.decode())
                await consumer.commit()
                break

измените согласно вашему Producer.py

Другие вопросы по тегам