Стримзи - Подключение внешних клиентов
Следуя обсуждению здесь, я использовал следующие шаги, чтобы подключить внешний клиент (на основе kafkajs) к Strimzi в OpenShift. Эти шаги отсюда.
Включить внешний маршрут
В kafka-persistent-single.yaml
редактируется как показано ниже.
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 2.3.0
replicas: 1
listeners:
plain: {}
tls: {}
external:
type: route
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: "2.3"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 5Gi
deleteClaim: false
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 5Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
Выписка сертификата,
Чтобы извлечь сертификат и использовать его в клиенте, я выполнил следующую команду:
kubectl get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -D > ca.crt
Обратите внимание, что мне пришлось использовать base64 -D
на моей macOS, а не base64 -d
как показано в документации.
Клиент Кафкайс
Это клиент, адаптированный из их npm
страницу и их документацию.
const fs = require('fs')
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io'],
ssl : { rejectUnauthorized: false,
ca : [fs.readFileSync('ca.crt', 'utf-8')]
}
})
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
// Producing
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error)
Вопрос
Когда я бегу node sample.js
из папки с ca.crt
, Я получаю сообщение об отказе в соединении.
{"level":"ERROR","timestamp":"2019-10-05T03:22:40.491Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.99.100:9094","broker":"my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:9094","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.99.100:9094\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1113:14)"}
Что мне не хватает?
2 ответа
Я предполагаю, что проблема в том, что вам не хватает правильного порта 443 на адресе брокера, поэтому вам нужно использовать
брокеры: ['my-cluster-kafka-bootstrap-messaging-os.192.168.99.100.nip.io:443']
в противном случае он пытается подключиться к порту 80 по умолчанию на маршруте OpenShift.
После расширенного обсуждения с @ppatierno я считаю, что кластер Strimzi хорошо работает с консольными клиентами Kafka. Вkafkajs
пакет, с другой стороны, продолжает терпеть неудачу с NOT_LEADER_FOR_PARTITION
.
UPDATE Python клиент, похоже, работает без суеты; так что я бросаюkafkajs
.