Storm Kafka Spout не фиксирует смещение в локальном кластере, spout получает одно и то же сообщение повторно
Я установил топологию шторма, которая получает входные данные с сервера kafka. Я использовал пакет kafka-storm для получения данных. Я успешно установил соединение между сервером kafka и топологией шторма в локальном кластере, но столкнулся с некоторыми проблемами при получении данных с сервера kafka.
Кафка Носик получает одно и то же сообщение повторно во время выполнения, даже если я установил spoutconfig.forceFromStart=false
а также spoutconfig.startOffsetTime =-1
Примечание. Когда я останавливаю и перезапускаю кластер, данные отправляются корректно на основе последнего смещения.
1 ответ
Сам разобрался, проблема с outputcollector
ack()
метод. Я реализовал болт коллектор с BaseBasicBolt
не признал кафкаспут. У меня заменить на BaseRichBolt
и сделал this.collector.ack(tuple)
вручную.
Теперь его работа отлично