Kafka-node внезапно потребляет со смещения 0
Иногда потребитель kafka-node начинает потреблять со смещения 0, тогда как по умолчанию он потребляет только новые сообщения. Тогда он не вернется к своему поведению по умолчанию. Вы знаете, как это решить, и что происходит, и его поведение внезапно меняется? Код очень простой, и это происходит без изменения кода.
var kafka = require("kafka-node") ;
Consumer = kafka.Consumer;
client = new kafka.KafkaClient();
consumer = new Consumer(client, [{ topic: "Topic_23", partition: 0}
]);
consumer.on("message", function(message) {
console.log(message)
});
Единственное решение, которое я нашел до сих пор, - это сменить тему кафки. Потом снова все работает нормально. Есть идеи?
1 ответ
В Kafka смещения не связаны с конкретными потребителями, а вместо этого связаны с группами потребителей. В вашем коде вы не предоставляете группу потребителей, поэтому каждый раз, когда вы запускаете потребителя, он назначается другой группе потребителей, и, таким образом, смещение начинается с0
.
Следующее должно помочь (очевидно, в первый раз, когда вы собираетесь читать все сообщения):
var kafka = require("kafka-node") ;
Consumer = kafka.Consumer;
client = new kafka.KafkaClient();
payload = [{
topic: "Topic_23",
partition: 0
}]
var options = {
groupId: 'test-consumer-group',
fromOffset: 'latest'
};
consumer = new Consumer(client, payload, options);
consumer.on("message", function(message) {
console.log(message)
});