Потребительский API Kafka Python ничего не возвращает
Я использую Kafka-Python для чтения темы от брокера Kafka, но я не могу заставить потребительский итератор что-либо возвращать
consumer = KafkaConsumer("topic",bootstrap_servers=bootstrap_server + ":" + str(port), group_id="mygroup")
for record in consumer:
print(record)
Кажется, он просто висит. Я проверил, что тема существует и имеет данные о брокере и что новые данные создаются. Когда я меняю звонок на KafkaConsumer
конструктор и добавить auto_offset_reset="earliest"
все работает, как ожидалось, и потребительский итератор возвращает записи. Значение по умолчанию для этого параметра "latest"
, но с этим значением я не могу использовать данные.
Почему это так?
1 ответ
Вы также должны включить auto_offset_reset='smallest'
при создании экземпляра KafkaConsumer
что эквивалентно --from-beginning
для инструмента командной строки kafka-console-consumer.sh
т.е.
consumer = KafkaConsumer("topic",bootstrap_servers=bootstrap_server + ":" + str(port), group_id="mygroup", auto_offset_reset='smallest')
Причина, по которой вы можете увидеть, что данные не потребляются, вероятно, заключается в том, что когда ваш потребитель запущен и работает, данные не производятся со стороны производителя. Следовательно, вам нужно указать, что вы хотите использовать все данные в теме (даже если в данный момент данные не вставляются).
Согласно официальной документации:
Потребитель Kafka работает, отправляя запросы "fetch" брокерам, ведущим разделы, которые он хочет использовать. Потребитель указывает свое смещение в журнале с каждым запросом и получает обратно часть журнала, начиная с этой позиции. Таким образом, потребитель имеет значительный контроль над этой позицией и может перемотать ее, чтобы при необходимости повторно использовать данные.