Потребительские записи Kafka — обработка
В Kafka я использую getmany для чтения сообщений потребителей. Всего из 650 сообщений (обработка которых займет около 3 дней) обработка происходит примерно для 100-150 записей (иногда 12 часов, а иногда и 24 часов), после чего дальнейшая обработка не происходит. Но потребительский поток не закрывается, когда я помещаю новое сообщение, оно обрабатывается, но не могу понять, почему это происходит.
Я подумал, что, возможно, некоторые сообщения пропускаются, поэтому, чтобы проверить, я пытался создать только 25 сообщений - все обработано (3 часа), затем было создано 100 сообщений - все обработано (12 часов), в этих ситуациях сообщения не пропускаются. Эта проблема возникает только тогда, когда выводится много сообщений (650).
Фрагмент кода:
async def consume(loop,lock):
logger.info('Inside Consume')
consumer = AIOKafkaConsumer(KAFKA_TOPIC,
loop=loop,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
enable_auto_commit=enable_auto_commit,
auto_commit_interval_ms=auto_commit_interval_ms,
auto_offset_reset=auto_offset_reset,
max_poll_records= 1,
max_poll_interval_ms=1500000,
rebalance_timeout_ms=1500000)
await consumer.start()
while True:
result = await consumer.getmany(timeout_ms=1500000, max_records=1)
for tp, msg in result.items():
if msg:
message = msg[0].value.decode()
message_json = json.loads(message)
data_encoded = message_json['payload']
if data_encoded.get('content') is not None:
msg = data_encoded['content']
message_dict_dum = base64.b64decode(msg)
message_dict = json.loads(message_dict_dum)
if message_dict.get("key1") is not None:
if message_dict["key1"] == "something":
logger.info('Hurray new message to process')
logger.info(message_dict["id"])
await process(data_encoded['content'],lock)
async def process(msg, lock):
async with lock:
logger.info('processing... ->{}'.format(msg))
await processmessage(msg)
logger.info('done processing')
if __name__ == "__main__":
logger.info('Inside main')
loop = asyncio.new_event_loop()
lock = asyncio.Lock(loop=loop)
loop.create_task(consume(loop, lock))
loop.run_forever()