Потребитель Aiokafka застрял в бесконечном цикле

Привет, я использую потребителя AioKafka для чтения сообщения, опубликованного другим процессом. Другой процесс только что опубликовал одно сообщение, и мой потребительский код бесконечно читает одно и то же сообщение. Я пытался использовать ручную фиксацию, но тщетно. Я использую библиотеку: https://pypi.org/project/aiokafka-commit/ Я хочу читать каждое сообщение только один раз, когда оно доступно.

      from aiokafka import AIOKafkaConsumer
import asyncio
        
async def consume():
    consumer = AIOKafkaConsumer('websocket_chat_kafka',
                                bootstrap_servers = "127.0.0.1:9092", 
                                group_id = 'gid',
                                client_id = 'cid',
                                # enable_auto_commit = True,
                                # auto_commit_interval_ms=1000,
                                auto_offset_reset="latest")

    # await consumer.start()
    # while True:
    #     msg = await consumer.getone()
    #     print(msg)
    #     await consumer.commit()

    await consumer.start()
    # await consumer.seek_to_committed()
    
    async for msg in consumer:
        print(msg)
        await consumer.commit()

    await consumer.stop()

asyncio.run(consume())

Любые идеи, что я делаю неправильно?

Спасибо.

0 ответов

Другие вопросы по тегам