Kafka Node High Level Producer пишет только на четные разделы

Я использую библиотеку Kafka Node и тестирую продюсера высокого уровня.

Я создал тему с 10 разделами 'HLPTestInput' и написал функцию, которая генерирует для нее каждую секунду.

Производитель пишет в разделы 0,2,4,6 и 8, но не в нечетные.

Как ни странно, когда я использую эту тему и выводу на вторую тему, 'HLPTestInputFromConsumer', которая имеет 5 разделов, сообщения записываются во все из них.

Мне не хватает конфигурации?

const kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    ConsumerGroup = kafka.ConsumerGroup,
    client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    producer = new HighLevelProducer(client),
    consumer = new ConsumerGroup(
        {
          kafkaHost: 'smc-dev.silverbolt.lab:9092',
            groupId: 'testGroup'
        },
        'HLPTestInput'
    );

let index = 0;
setInterval(() => {
    producer.send([{
        topic: 'HLPTestInput',
        messages: [index]
    }], (err, data) => {
        console.log('produced', data);
    });
    index++;
}, 1000);

consumer.on('message', (message) => {
    console.log('consumed', message);
    producer.send([{
        topic: 'HLPTestInputFromConsumer',
        messages: [message]
    }], (err, data) => {
        console.log('produced to secondary', data);
    });
});

0 ответов

Я не уверен, но это может быть из-за того, что вы используете одного и того же продюсера для написания на две разные темы. Поскольку HighLevelProducer для записи использует циклический перебор. Итак, предположим, что ваш продюсер пишет в теме "HLPTestInput", а затем вы устанавливаете временной интервал на 1000, чтобы в это время ваш потребитель получил сообщение, а теперь ваш производитель пишет в теме "HLPTestInputFromConsumer".

Итак, ваш продюсер пишет тему "HLPTestInput" в своих разделах 0,2,4...

и тему "HLPTestInputFromConsumer" в разделе 1,3,5 ...

Поэтому я предлагаю просто попробовать создать другого производителя. Тогда все должно работать нормально.

попробуйте ниже код:

const kafka = require('kafka-node'),
    HighLevelProducer = kafka.HighLevelProducer,
    ConsumerGroup = kafka.ConsumerGroup,
    client = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    client1 = new kafka.KafkaClient({kafkaHost: 'smc-dev.silverbolt.lab:9092'}),
    producer = new HighLevelProducer(client),
    producer1 = new HighLevelProducer(client1),
    consumer = new ConsumerGroup(
       {
          kafkaHost: 'smc-dev.silverbolt.lab:9092',
           groupId: 'testGroup'
        },
        'HLPTestInput'
    );
let index = 0;
    setInterval(() => {
    producer.send([{
        topic: 'HLPTestInput',
        messages: [index]
    }], (err, data) => {
        console.log('produced', data);
    });
   index++;
}, 1000);

consumer.on('message', (message) => {
    console.log('consumed', message);
    producer1.send([{
        topic: 'HLPTestInputFromConsumer',
        messages: [message]
    }], (err, data) => {
        console.log('produced to secondary', data);
    });
});
Другие вопросы по тегам