Создание раздела для темы в kafka-node
Я создал HighLevelProducer для публикации сообщений в потоке темы, который будет использоваться ConsumerGroupStream с использованием kafka-node. Когда я создаю несколько потребителей из одной группы ConsumerGroup для использования из одной и той же темы, создается только один раздел и только один потребитель использует. Я также попытался определить количество разделов для этой темы, хотя я не уверен, требуется ли определять его при создании темы, и если да, то сколько разделов мне понадобится заранее. Кроме того, можно ли отправить объект в поток Transform, а не строку (в настоящее время я использовал JSON.stringify, потому что в противном случае я получил бы [Object object] в потребителе.
const myProducerStream = ({ kafkaHost, highWaterMark, topic }) => {
const kafkaClient = new KafkaClient({ kafkaHost });
const producer = new HighLevelProducer(kafkaClient);
const options = {
highWaterMark,
kafkaClient,
producer
};
kafkaClient.refreshMetadata([topic], err => {
if (err) throw err;
});
return new ProducerStream(options);
};
const transfrom = topic => new Transform({
objectMode: true,
decodeStrings: true,
transform(obj, encoding, cb) {
console.log(`pushing message ${JSON.stringify(obj)} to topic "${topic}"`);
cb(null, {
topic,
messages: JSON.stringify(obj)
});
}
});
const publisher = (topic, kafkaHost, highWaterMark) => {
const myTransfrom = transfrom(topic);
const producer = myProducerStream({ kafkaHost, highWaterMark, topic });
myTransfrom.pipe(producer);
return myTransform;
};
Потребитель:
const createConsumerStream = (sourceTopic, kafkaHost, groupId) => {
const consumerOptions = {
kafkaHost,
groupId,
protocol: ['roundrobin'],
encoding: 'utf8',
id: uuidv4(),
fromOffset: 'latest',
outOfRangeOffset: 'earliest',
};
const consumerGroupStream = new ConsumerGroupStream(consumerOptions, sourceTopic);
consumerGroupStream.on('connect', () => {
console.log(`Consumer id: "${consumerOptions.id}" is connected!`);
});
consumerGroupStream.on('error', (err) => {
console.error(`Consumer id: "${consumerOptions.id}" encountered an error: ${err}`);
});
return consumerGroupStream;
};
const publisher = (func, destTopic, consumerGroupStream, kafkaHost, highWaterMark) => {
const messageTransform = new AsyncMessageTransform(func, destTopic);
const resultProducerStream = myProducerStream({ kafkaHost, highWaterMark, topic: destTopic })
consumerGroupStream.pipe(messageTransform).pipe(resultProducerStream);
};
1 ответ
По первому вопросу: максимальное количество работающих потребителей в группе равно количеству разделов.
Итак, если у вас есть TopicA с 1 разделом и у вас есть 5 потребителей в вашей группе потребителей, 4 из них будут простаивать.
Если у вас есть TopicA с 5 разделами и у вас есть 5 потребителей в вашей группе потребителей, все они будут активны и будут получать сообщения из вашей темы.
Чтобы указать количество разделов, вы должны создать тему из CLI, а не ожидать, что Kafka создаст ее при первой публикации сообщений.
Чтобы создать тему с определенным количеством разделов:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
Чтобы изменить количество разделов в уже существующей теме:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic test
--partitions 40
Обратите внимание, что вы можете только увеличить количество разделов, вы не можете их уменьшить.
См. Документы Kafka https://kafka.apache.org/documentation.html.
Также, если вы хотите узнать больше о Kafka, загляните в бесплатную книгу https://www.confluent.io/resources/kafka-the-definitive-guide/