Потребительские записи Kafka — обработка

В Kafka я использую getmany для чтения сообщений потребителей. Всего из 650 сообщений (обработка которых займет около 3 дней) обработка происходит примерно для 100-150 записей (иногда 12 часов, а иногда и 24 часов), после чего дальнейшая обработка не происходит. Но потребительский поток не закрывается, когда я помещаю новое сообщение, оно обрабатывается, но не могу понять, почему это происходит.

Я подумал, что, возможно, некоторые сообщения пропускаются, поэтому, чтобы проверить, я пытался создать только 25 сообщений - все обработано (3 часа), затем было создано 100 сообщений - все обработано (12 часов), в этих ситуациях сообщения не пропускаются. Эта проблема возникает только тогда, когда выводится много сообщений (650).

Фрагмент кода:

      async def consume(loop,lock):
    logger.info('Inside Consume')

    consumer = AIOKafkaConsumer(KAFKA_TOPIC,
                        loop=loop,
                        bootstrap_servers=bootstrap_servers,
                        group_id=group_id,           
                        enable_auto_commit=enable_auto_commit,       
                        auto_commit_interval_ms=auto_commit_interval_ms,  
                        auto_offset_reset=auto_offset_reset,  
                        max_poll_records= 1,
                        max_poll_interval_ms=1500000,
                        rebalance_timeout_ms=1500000)
              
    await consumer.start()
    while True:
        result = await consumer.getmany(timeout_ms=1500000, max_records=1)
        for tp, msg in result.items(): 
            if msg:
                message = msg[0].value.decode()
                message_json = json.loads(message)
                data_encoded = message_json['payload']

                if data_encoded.get('content') is not None:
                    msg = data_encoded['content']
                    message_dict_dum = base64.b64decode(msg)
                    message_dict = json.loads(message_dict_dum)
                    
                    if message_dict.get("key1") is not None:
                        if message_dict["key1"] == "something":
                            logger.info('Hurray new message to process')
                            logger.info(message_dict["id"])
                            await process(data_encoded['content'],lock)
                            

async def process(msg, lock):
    async with lock:
        logger.info('processing... ->{}'.format(msg))
        await processmessage(msg)
        logger.info('done processing')        


if __name__ == "__main__":
    logger.info('Inside main')

    loop = asyncio.new_event_loop()
    lock = asyncio.Lock(loop=loop)

    loop.create_task(consume(loop, lock))
    loop.run_forever()

0 ответов

Другие вопросы по тегам