Kafka-node внезапно потребляет со смещения 0

Иногда потребитель kafka-node начинает потреблять со смещения 0, тогда как по умолчанию он потребляет только новые сообщения. Тогда он не вернется к своему поведению по умолчанию. Вы знаете, как это решить, и что происходит, и его поведение внезапно меняется? Код очень простой, и это происходит без изменения кода.

var kafka = require("kafka-node") ;
  Consumer = kafka.Consumer;
  client = new kafka.KafkaClient();


  consumer = new Consumer(client, [{ topic: "Topic_23", partition: 0}
                                    ]);


consumer.on("message", function(message) {

    console.log(message)


  });

Единственное решение, которое я нашел до сих пор, - это сменить тему кафки. Потом снова все работает нормально. Есть идеи?

1 ответ

В Kafka смещения не связаны с конкретными потребителями, а вместо этого связаны с группами потребителей. В вашем коде вы не предоставляете группу потребителей, поэтому каждый раз, когда вы запускаете потребителя, он назначается другой группе потребителей, и, таким образом, смещение начинается с0.

Следующее должно помочь (очевидно, в первый раз, когда вы собираетесь читать все сообщения):

var kafka = require("kafka-node") ;

Consumer = kafka.Consumer;
client = new kafka.KafkaClient();

payload = [{
    topic: "Topic_23", 
    partition: 0
}]

var options = {
    groupId: 'test-consumer-group',
    fromOffset: 'latest'
};


consumer = new Consumer(client, payload, options);
consumer.on("message", function(message) {
    console.log(message)
  });
Другие вопросы по тегам