NodeJS: KafkaJSProtocolError: Поддерживаемые протоколы члена группы несовместимы с протоколами существующих членов.
Я пытаюсь захватить данные из Kafka с помощью коннектора debezium MongoDB, но я получаю сообщение об ошибке при попытке прочитать их с KafkaJS:
KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members
Я использую изображения докера для захвата данных.
Вот шаги, я следую:
Запустить Zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest
начать кафку
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest
У меня MongoDB уже работает в режиме репликации
Старт дебезиум кафка коннект
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka debezium/connect:latest
Затем опубликовать конфигурацию коннектора MongoDB
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "mongodb-connector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "rs0/abc.com:27017", "mongodb.name": "fullfillment", "collection.whitelist": "mongodev.test", "mongodb.user": "kafka", "mongodb.password": "kafka01" } }'
С этим, если я запустил контейнер докера наблюдателя, я могу данные в формате Json в консоли
docker run -it --name watchermongo --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k fullfillment.mongodev.test
но я хочу захватить эти данные в приложении, чтобы я мог манипулировать ими, обрабатывать их и передавать в ElasticSearch. Для этого я использую
https://github.com/tulios/kafkajs
Но когда я запускаю потребительский код, я получаю ошибку.. Вот пример кода
//'use strict';
// clientId=connect-1, groupId=1
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'connect-1',
brokers: ['localhost:9092', 'localhost:9093']
})
// Consuming
const consumer = kafka.consumer({ groupId: '1' })
var consumeMessage = async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'fullfillment.mongodev.test' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
}
consumeMessage();
KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members
1 ответ
Вы не должны использовать один и тот же идентификатор группы в Connect и у своего потребителя KafkaJS. Если вы это сделаете, они будут частью той же группы потребителей, что означает, что сообщения будут использоваться только одним или другим, если это вообще сработало.
Если вы измените groupId вашего потребителя KafkaJS на что-то уникальное, оно должно работать.
Обратите внимание, что по умолчанию новая группа потребителей KafkaJS начнет потреблять с последнего смещения, поэтому она не будет потреблять уже созданные сообщения. Вы можете изменить это поведение с помощью fromBeginning
флаг в consumer.subscribe
вызов. См. https://kafka.js.org/docs/consuming.