невозможно получить асинхронное сообщение от производителя и потребителя
Кафка, смотритель зоопарка, работает успешно
Это мой продюсер.py
async def publish():
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092',
enable_idempotence=True)
await producer.start()
consumer = AIOKafkaConsumer(
topicAKG,
bootstrap_servers='localhost:9092',group_id='test',
max_poll_interval_ms=60000,
max_poll_records=50)
await consumer.start()
try:
for i in range(1, 6):
await producer.send_and_wait(topic, value='from producer'.encode())
print(f"Iteration: {i}")
async for message in consumer:
print("Received ========== ", message.value.decode())
await consumer.commit()
finally:
await producer.stop()
await consumer.stop()
Это мой потребитель.py
импортировать asyncio из aiokafka, импортировать AIOKafkaConsumer, AIOKafkaProducer
topic = 'app'
topicAKG = 'back'
async def consume():
consumer = AIOKafkaConsumer(topic, bootstrap_servers='localhost:9092',
group_id="test",
max_poll_interval_ms=60000,
max_poll_records=50)
await consumer.start()
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092',
enable_idempotence=True)
await producer.start()
try:
async for message in consumer:
print("Received",message.value.decode())
await asyncio.sleep(2) # Delay for 3 seconds
await consumer.commit() # Commit the offset to avoid re-consuming the same message
await producer.send_and_wait(topicAKG, value='from consumer'.encode())
finally:
await consumer.stop()
await producer.stop()
loop = asyncio.get_event_loop()
loop.run_until_complete(consume())
выход от производителя
Итерация:1
Получено ========== от потребителя
выпуск от потребителя
Получено от производителя
поэтому он застрял на итерации 1 и завис
1 ответ
используйте перерыв в Producer.out цикла после получения первого сообщения
try:
for i in range(1, 6):
await producer.send_and_wait(topic, value='from producer'.encode())
print(f"Iteration: {i}")
async for message in consumer:
print("Received ========== ", message.value.decode())
await consumer.commit()
break
измените согласно вашему Producer.py