Перезапуск потребителя Kafka (python) снова потребляет все сообщения в очереди
Я использую Kafka 0.8.1 и Kafka python-0.9.0. В моей настройке у меня есть 2 брокера кафки. Когда я запускаю своего потребителя кафки, я вижу, как он извлекает сообщения из очереди и отслеживает смещения для обоих брокеров. Все отлично работает!
Моя проблема в том, что когда я перезапускаю потребителя, он начинает потреблять сообщения с самого начала. Я ожидал, что при перезапуске потребитель начнет потреблять сообщения с того места, где он остановился до того, как умер.
Я попытался отследить смещение сообщений в Redis, а затем вызвал consumer.seek перед чтением сообщения из очереди, чтобы убедиться, что я получаю только те сообщения, которых раньше не видел. Несмотря на то, что это сработало, перед развертыванием этого решения я хотел уточнить у вас... возможно, есть что-то, что я неправильно понимаю в Kafka или клиенте python-Kafka. Похоже, что потребитель может возобновить чтение с того места, где он остановился, - это довольно базовая функциональность.
Спасибо!
3 ответа
Берегите себя с библиотекой kafka-python. У него есть несколько мелких проблем.
Если скорость не является проблемой для вашего потребителя, вы можете установить автоматическую фиксацию в каждом сообщении. Это должно работать.
SimpleConsumer предоставляет seek
метод ( https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py), который позволяет вам начать использовать сообщения в любой точке, которую вы хотите.
Самые обычные звонки:
consumer.seek(0, 0)
начать чтение с начала очереди.consumer.seek(0, 1)
начать чтение с текущего смещения.consumer.seek(0, 2)
пропустить все ожидающие сообщения и начать читать только новые сообщения.
Первый аргумент - это смещение этих позиций. Таким образом, если вы позвоните consumer.seek(5, 0)
вы пропустите первые 5 сообщений из очереди.
Также не забывайте, что смещение сохраняется для групп потребителей. Убедитесь, что вы используете один и тот же все время.
kafka-python хранит смещения с сервером kafka, а не через отдельное соединение zookeeper. К сожалению, apis-сервер kafka для поддержки смещений фиксации / извлечения не был полностью функциональным, пока apache kafka 0.8.1.1. Если вы обновляете свой сервер kafka, ваши настройки должны работать. Я бы также предложил обновить kafka-python до 0.9.4.
[сопровождающий kafka-python]
Прежде всего, вам нужно установить group_id, записывая смещение, чтобы он возобновил потребление сообщения из этого group_id
.
Если вы уже использовали все существующие сообщения в группе, вы хотите повторно использовать сообщения еще раз. вы можете использоватьseek
для достижения этой цели.
Вот пример:
def test_consume_from_offset(offset):
topic = 'test'
consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id='test')
tp = TopicPartition(topic=topic, partition=0)
consumer.assign([tp])
consumer.seek(tp, offset) # you can set the offset you want to resume from.
for msg in consumer:
# the msg begins with the offset you set
print(msg)
test_consume_from_offset(10)
Потребитель Кафки может хранить смещения в Zookeeper. В Java API у нас есть два варианта: потребитель высокого уровня, который управляет состоянием для нас и начинает потреблять с того места, где он оставался после перезапуска, и потребитель низкого уровня без состояния без этой сверхдержавы.
Из того, что я понимаю в потребительском коде Python ( https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py), и SimpleConsumer, и MultiProcessConsumer находятся в состоянии и отслеживают текущие смещения в Zookeeper, поэтому странно, что у вас есть эта проблема повторного использования.
Убедитесь, что у вас одинаковые идентификаторы группы потребителей при перезапуске (возможно, вы установили его случайным образом?) И отметьте следующие параметры:
auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit
Может быть, вы потребляете < 100 сообщений или < 5000 мс?
Вам просто нужно убедиться, что ваш Kafka Consumer начинает чтение с последнего смещения (auto.offset.reset="latest"
). Также убедитесь, что вы определили группу потребителей, чтобы смещения могли быть зафиксированы, а когда потребитель упал, мог выбрать свою последнюю зафиксированную позицию.
С помощью confluent-kafka-python
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'latest'
})
c.subscribe(['my_topic'])
С помощью kafka-python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='mygroup'
)