Потребляйте из кафки без бесконечного цикла

В настоящее время я использую клиент Python Confluent kafka для получения сообщений из темы kafka, и код отлично работает внутри while Trueцикл, как показано в примерах в документации. Однако я хотел бы настроить задание cron, которое потребляет из темы только один раз в день. Идея состоит в том, что задание будет проверять тему утром, обработать все сообщения в теме на данный момент времени и затем остановиться. Я пробовал добиться этого на Python вот так:

msg = kafka_consumer.consume()
while msg:
  msg_val = msg.value().decode('utf-8')
  // do something with msg
  msg = kafka_consumer.consume()

Проблема в том, что он никогда ничего не потребляет. Я предполагаю, что первая строка никогда не получает сообщение с первой попытки. Работает только сwhile True но я не хочу, чтобы этот код выполнялся бесконечно, пока не будет использовано последнее сообщение в этот момент времени.

1 ответ

Решение

Вы можете проверить смещения группы потребителей внутри цикла, а затем прервать цикл, когда вы окажетесь в пределах некоторого порогового значения "конца".

Вы также можете поиграть с max.poll.records конфигурация потребителя, чтобы дать больше контроля над тем, сколько записей вы получите

Другие вопросы по тегам