Потребительский API Kafka Python ничего не возвращает

Я использую Kafka-Python для чтения темы от брокера Kafka, но я не могу заставить потребительский итератор что-либо возвращать

consumer = KafkaConsumer("topic",bootstrap_servers=bootstrap_server + ":" + str(port), group_id="mygroup")

for record in consumer:
    print(record)

Кажется, он просто висит. Я проверил, что тема существует и имеет данные о брокере и что новые данные создаются. Когда я меняю звонок на KafkaConsumer конструктор и добавить auto_offset_reset="earliest" все работает, как ожидалось, и потребительский итератор возвращает записи. Значение по умолчанию для этого параметра "latest", но с этим значением я не могу использовать данные.

Почему это так?

1 ответ

Вы также должны включить auto_offset_reset='smallest' при создании экземпляра KafkaConsumer что эквивалентно --from-beginning для инструмента командной строки kafka-console-consumer.sh

т.е.

consumer = KafkaConsumer("topic",bootstrap_servers=bootstrap_server + ":" + str(port), group_id="mygroup", auto_offset_reset='smallest')

Причина, по которой вы можете увидеть, что данные не потребляются, вероятно, заключается в том, что когда ваш потребитель запущен и работает, данные не производятся со стороны производителя. Следовательно, вам нужно указать, что вы хотите использовать все данные в теме (даже если в данный момент данные не вставляются).

Согласно официальной документации:

Потребитель Kafka работает, отправляя запросы "fetch" ​​брокерам, ведущим разделы, которые он хочет использовать. Потребитель указывает свое смещение в журнале с каждым запросом и получает обратно часть журнала, начиная с этой позиции. Таким образом, потребитель имеет значительный контроль над этой позицией и может перемотать ее, чтобы при необходимости повторно использовать данные.

Другие вопросы по тегам