NodeJS: KafkaJSProtocolError: Поддерживаемые протоколы члена группы несовместимы с протоколами существующих членов.

Я пытаюсь захватить данные из Kafka с помощью коннектора debezium MongoDB, но я получаю сообщение об ошибке при попытке прочитать их с KafkaJS:

KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members

Я использую изображения докера для захвата данных.

Вот шаги, я следую:

  1. Запустить Zookeeper

    docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest
    
  2. начать кафку

    docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest
    
  3. У меня MongoDB уже работает в режиме репликации

  4. Старт дебезиум кафка коннект

    docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka  debezium/connect:latest
    
  5. Затем опубликовать конфигурацию коннектора MongoDB

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "mongodb-connector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "rs0/abc.com:27017", "mongodb.name": "fullfillment", "collection.whitelist": "mongodev.test", "mongodb.user": "kafka", "mongodb.password": "kafka01" } }'
    
  6. С этим, если я запустил контейнер докера наблюдателя, я могу данные в формате Json в консоли

    docker run -it --name watchermongo --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k fullfillment.mongodev.test
    

но я хочу захватить эти данные в приложении, чтобы я мог манипулировать ими, обрабатывать их и передавать в ElasticSearch. Для этого я использую

https://github.com/tulios/kafkajs 

Но когда я запускаю потребительский код, я получаю ошибку.. Вот пример кода

//'use strict';






// clientId=connect-1, groupId=1

const { Kafka } = require('kafkajs')



const kafka = new Kafka({

  clientId: 'connect-1',

  brokers: ['localhost:9092', 'localhost:9093']

})


// Consuming

const consumer = kafka.consumer({ groupId: '1' })



var consumeMessage = async () => {



await consumer.connect()

await consumer.subscribe({ topic: 'fullfillment.mongodev.test' })



await consumer.run({

  eachMessage: async ({ topic, partition, message }) => {

    console.log({

      value: message.value.toString(),

    })

  },

})



}



consumeMessage();


KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members

1 ответ

Решение

Вы не должны использовать один и тот же идентификатор группы в Connect и у своего потребителя KafkaJS. Если вы это сделаете, они будут частью той же группы потребителей, что означает, что сообщения будут использоваться только одним или другим, если это вообще сработало.

Если вы измените groupId вашего потребителя KafkaJS на что-то уникальное, оно должно работать.

Обратите внимание, что по умолчанию новая группа потребителей KafkaJS начнет потреблять с последнего смещения, поэтому она не будет потреблять уже созданные сообщения. Вы можете изменить это поведение с помощью fromBeginning флаг в consumer.subscribe вызов. См. https://kafka.js.org/docs/consuming.

Другие вопросы по тегам