как заставить autocommit false работать в kafkajs

Сценарий : я
устанавливаю autocommit false,
создавая 12 сообщений, использующих
их... (скажем, со смещения 100),
завершение работы потребителя,
запуск нового потребителя

на этом этапе я ожидаю, что второй потребитель снова прочитает все сообщения, начиная со смещения 100 (потому что фиксация не была сделана)

но при создании новых сообщений я вижу, что 2-й потребитель начинает с нового смещения (113), то есть фиксация все еще каким-то образом происходит..

что я ошибаюсь?

это мой потребительский код

const { Kafka } = require('kafkajs');
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['192.168.14.10:9095']
});
const admin = kafka.admin();
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
  // admin
  await admin.connect();
  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: 'topic-test2'});
  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString()
      });
      }
    }
  });
};
run().catch(console.error);

0 ответов

Другие вопросы по тегам