Кафка 0.10.1.0 смена времени смещения
Конвейер Elasticsearch, настроенный с кластером Kafka между 2 экземплярами logstash. Мне нужно сбросить смещение назад на 12 часов для темы и снова запустить потребителя.
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kfkserver:9092 --topic topicname --time 1488153601000
который возвращается topicname:0:3730858
1488153601000 <- 2017-02-27 00:00:01 в миллисекундах
Как я могу установить время смещения?
1 ответ
Если вы используете 0.10.x и у вас нет замечательного инструмента управления смещениями, который был добавлен в 0.11, есть способ использовать kafka-console-consumer.sh, чтобы изменить смещение группы потребителей. Это работает только с числовым смещением, но не с отметкой времени.
Во-первых, остановите любой запущенный процесс, использующий этого потребителя. Чистое отключение лучше. Затем выполните команду, которая выглядит следующим образом:
bin/kafka-console-consumer.sh --bootstrap-server :9092 \
--topic my-topic \
--partition 1 \
--consumer-property group.id=my-consumer-group \
--max-messages 0 \
--offset 12345
--max-messages 0
здесь важно; установка его в любое другое значение, включая 1, потребляет столько сообщений, а затем фиксирует текущее последнее смещение в этой теме / разделе. Это должно быть ошибка в консоли потребителя.
Далее, проверьте свою работу с kafka-consumer-groups.sh:
./kafka-consumer-groups.sh --bootstrap-server :9092 \
--group my-consumer-group \
--describe