Потребляйте из кафки без бесконечного цикла
В настоящее время я использую клиент Python Confluent kafka для получения сообщений из темы kafka, и код отлично работает внутри while True
цикл, как показано в примерах в документации. Однако я хотел бы настроить задание cron, которое потребляет из темы только один раз в день. Идея состоит в том, что задание будет проверять тему утром, обработать все сообщения в теме на данный момент времени и затем остановиться. Я пробовал добиться этого на Python вот так:
msg = kafka_consumer.consume()
while msg:
msg_val = msg.value().decode('utf-8')
// do something with msg
msg = kafka_consumer.consume()
Проблема в том, что он никогда ничего не потребляет. Я предполагаю, что первая строка никогда не получает сообщение с первой попытки. Работает только сwhile True
но я не хочу, чтобы этот код выполнялся бесконечно, пока не будет использовано последнее сообщение в этот момент времени.
1 ответ
Вы можете проверить смещения группы потребителей внутри цикла, а затем прервать цикл, когда вы окажетесь в пределах некоторого порогового значения "конца".
Вы также можете поиграть с max.poll.records
конфигурация потребителя, чтобы дать больше контроля над тем, сколько записей вы получите