Кафка 0.10.1.0 смена времени смещения

Конвейер Elasticsearch, настроенный с кластером Kafka между 2 экземплярами logstash. Мне нужно сбросить смещение назад на 12 часов для темы и снова запустить потребителя.

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list  kfkserver:9092 --topic topicname --time 1488153601000

который возвращается topicname:0:3730858

1488153601000 <- 2017-02-27 00:00:01 в миллисекундах

Как я могу установить время смещения?

1 ответ

Если вы используете 0.10.x и у вас нет замечательного инструмента управления смещениями, который был добавлен в 0.11, есть способ использовать kafka-console-consumer.sh, чтобы изменить смещение группы потребителей. Это работает только с числовым смещением, но не с отметкой времени.

Во-первых, остановите любой запущенный процесс, использующий этого потребителя. Чистое отключение лучше. Затем выполните команду, которая выглядит следующим образом:

bin/kafka-console-consumer.sh --bootstrap-server :9092 \
    --topic my-topic \
    --partition 1 \
    --consumer-property group.id=my-consumer-group \
    --max-messages 0 \
    --offset 12345

--max-messages 0 здесь важно; установка его в любое другое значение, включая 1, потребляет столько сообщений, а затем фиксирует текущее последнее смещение в этой теме / разделе. Это должно быть ошибка в консоли потребителя.

Далее, проверьте свою работу с kafka-consumer-groups.sh:

./kafka-consumer-groups.sh --bootstrap-server :9092 \
    --group my-consumer-group \
    --describe
Другие вопросы по тегам