Kafka Node High Level Producer пишет только на четные разделы
Я использую библиотеку Kafka Node и тестирую продюсера высокого уровня.
Я создал тему с 10 разделами 'HLPTestInput' и написал функцию, которая генерирует для нее каждую секунду.
Производитель пишет в разделы 0,2,4,6 и 8, но не в нечетные.
Как ни странно, когда я использую эту тему и выводу на вторую тему, 'HLPTestInputFromConsumer', которая имеет 5 разделов, сообщения записываются во все из них.
Мне не хватает конфигурации?
const kafka = require('kafka-node'),
HighLevelProducer = kafka.HighLevelProducer,
ConsumerGroup = kafka.ConsumerGroup,
client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
producer = new HighLevelProducer(client),
consumer = new ConsumerGroup(
{
kafkaHost: 'smc-dev.silverbolt.lab:9092',
groupId: 'testGroup'
},
'HLPTestInput'
);
let index = 0;
setInterval(() => {
producer.send([{
topic: 'HLPTestInput',
messages: [index]
}], (err, data) => {
console.log('produced', data);
});
index++;
}, 1000);
consumer.on('message', (message) => {
console.log('consumed', message);
producer.send([{
topic: 'HLPTestInputFromConsumer',
messages: [message]
}], (err, data) => {
console.log('produced to secondary', data);
});
});
0 ответов
Я не уверен, но это может быть из-за того, что вы используете одного и того же продюсера для написания на две разные темы. Поскольку HighLevelProducer для записи использует циклический перебор. Итак, предположим, что ваш продюсер пишет в теме "HLPTestInput", а затем вы устанавливаете временной интервал на 1000, чтобы в это время ваш потребитель получил сообщение, а теперь ваш производитель пишет в теме "HLPTestInputFromConsumer".
Итак, ваш продюсер пишет тему "HLPTestInput" в своих разделах 0,2,4...
и тему "HLPTestInputFromConsumer" в разделе 1,3,5 ...
Поэтому я предлагаю просто попробовать создать другого производителя. Тогда все должно работать нормально.
попробуйте ниже код:
const kafka = require('kafka-node'),
HighLevelProducer = kafka.HighLevelProducer,
ConsumerGroup = kafka.ConsumerGroup,
client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
client1 = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
producer = new HighLevelProducer(client),
producer1 = new HighLevelProducer(client1),
consumer = new ConsumerGroup(
{
kafkaHost: 'smc-dev.silverbolt.lab:9092',
groupId: 'testGroup'
},
'HLPTestInput'
);
let index = 0;
setInterval(() => {
producer.send([{
topic: 'HLPTestInput',
messages: [index]
}], (err, data) => {
console.log('produced', data);
});
index++;
}, 1000);
consumer.on('message', (message) => {
console.log('consumed', message);
producer1.send([{
topic: 'HLPTestInputFromConsumer',
messages: [message]
}], (err, data) => {
console.log('produced to secondary', data);
});
});