Перезапуск потребителя Kafka (python) снова потребляет все сообщения в очереди

Я использую Kafka 0.8.1 и Kafka python-0.9.0. В моей настройке у меня есть 2 брокера кафки. Когда я запускаю своего потребителя кафки, я вижу, как он извлекает сообщения из очереди и отслеживает смещения для обоих брокеров. Все отлично работает!

Моя проблема в том, что когда я перезапускаю потребителя, он начинает потреблять сообщения с самого начала. Я ожидал, что при перезапуске потребитель начнет потреблять сообщения с того места, где он остановился до того, как умер.

Я попытался отследить смещение сообщений в Redis, а затем вызвал consumer.seek перед чтением сообщения из очереди, чтобы убедиться, что я получаю только те сообщения, которых раньше не видел. Несмотря на то, что это сработало, перед развертыванием этого решения я хотел уточнить у вас... возможно, есть что-то, что я неправильно понимаю в Kafka или клиенте python-Kafka. Похоже, что потребитель может возобновить чтение с того места, где он остановился, - это довольно базовая функциональность.

Спасибо!

3 ответа

Берегите себя с библиотекой kafka-python. У него есть несколько мелких проблем.

Если скорость не является проблемой для вашего потребителя, вы можете установить автоматическую фиксацию в каждом сообщении. Это должно работать.

SimpleConsumer предоставляет seek метод ( https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py), который позволяет вам начать использовать сообщения в любой точке, которую вы хотите.

Самые обычные звонки:

  • consumer.seek(0, 0) начать чтение с начала очереди.
  • consumer.seek(0, 1) начать чтение с текущего смещения.
  • consumer.seek(0, 2) пропустить все ожидающие сообщения и начать читать только новые сообщения.

Первый аргумент - это смещение этих позиций. Таким образом, если вы позвоните consumer.seek(5, 0) вы пропустите первые 5 сообщений из очереди.

Также не забывайте, что смещение сохраняется для групп потребителей. Убедитесь, что вы используете один и тот же все время.

kafka-python хранит смещения с сервером kafka, а не через отдельное соединение zookeeper. К сожалению, apis-сервер kafka для поддержки смещений фиксации / извлечения не был полностью функциональным, пока apache kafka 0.8.1.1. Если вы обновляете свой сервер kafka, ваши настройки должны работать. Я бы также предложил обновить kafka-python до 0.9.4.

[сопровождающий kafka-python]

Прежде всего, вам нужно установить group_id, записывая смещение, чтобы он возобновил потребление сообщения из этого group_id.

Если вы уже использовали все существующие сообщения в группе, вы хотите повторно использовать сообщения еще раз. вы можете использоватьseek для достижения этой цели.

Вот пример:

def test_consume_from_offset(offset):
    topic = 'test'
    consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id='test')
    tp = TopicPartition(topic=topic, partition=0)
    consumer.assign([tp])
    consumer.seek(tp, offset)   # you can set the offset you want to resume from.
    for msg in consumer:
        # the msg begins with the offset you set
        print(msg)

test_consume_from_offset(10)

Потребитель Кафки может хранить смещения в Zookeeper. В Java API у нас есть два варианта: потребитель высокого уровня, который управляет состоянием для нас и начинает потреблять с того места, где он оставался после перезапуска, и потребитель низкого уровня без состояния без этой сверхдержавы.

Из того, что я понимаю в потребительском коде Python ( https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py), и SimpleConsumer, и MultiProcessConsumer находятся в состоянии и отслеживают текущие смещения в Zookeeper, поэтому странно, что у вас есть эта проблема повторного использования.

Убедитесь, что у вас одинаковые идентификаторы группы потребителей при перезапуске (возможно, вы установили его случайным образом?) И отметьте следующие параметры:

auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
                     before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
                     wait before commit

Может быть, вы потребляете < 100 сообщений или < 5000 мс?

Вам просто нужно убедиться, что ваш Kafka Consumer начинает чтение с последнего смещения (auto.offset.reset="latest"). Также убедитесь, что вы определили группу потребителей, чтобы смещения могли быть зафиксированы, а когда потребитель упал, мог выбрать свою последнюю зафиксированную позицию.


С помощью confluent-kafka-python

from confluent_kafka import Consumer


c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'latest'
})

c.subscribe(['my_topic'])

С помощью kafka-python

from kafka import KafkaConsumer


consumer = KafkaConsumer(
    'my_topic', 
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest', 
    enable_auto_commit=True,
    group_id='mygroup'
)
Другие вопросы по тегам