Невозможно подключить производителя / потребителя к облачному брокеру kafka после развертывания на k8s с помощью strimzi
Я не могу заставить продюсера писать на тему кафки. Моя служба кафка была развернута на k8s с помощью strimzi. Мой кластер k8s имеет 2 узла в облачной службе Google.
Как видно из конфигурации k8s, все службы включены:
В kafka-cluster-kafka-external-bootstrap
это услуга (nodeport
) для связи с брокером кафка. В основном он перенаправляет запросы от внешнего узла к внутренней брокерской службе. Вот некоторые подробности:
Следуя руководству (в котором в качестве примера кластера используется minikube), я извлек IP-адреса узлов:
kubectl get nodes --output=jsonpath='{range .items[*]}{.status.addresses[?(@.type=="ExternalIP")].address}{"\n"}{end}'
35.xxx.xxx.xxx
34.xxx.xxx.xxx
(основное отличие от руководства в том, что я использую "ExternalIP" вместо "InternalIP", потому что все делаю удаленно)
Затем я ищу порт, на котором выставлен сервис:
kubectl get service kafka-cluster-kafka-external-bootstrap -n xxxx-kafka -o=jsonpath='{.spec.ports[0].nodePort}{"\n"}'
30680
Но когда я пытаюсь запустить своего продюсера, используя локальную корзину apache-kafka, я получаю следующее:
sh kafka-console-producer.sh --broker-list 35.xxx.xxx.xxx:30680 --topic test
>[2020-02-11 16:37:18,388] WARN [Producer clientId=console-producer] Connection to node -1 (/35.xxx.xxx.xxx:30680) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Итак, я попытался пропинговать ip, чтобы узнать, доступен ли он:
ping 35.xxx.xxx.xxx
PING 35.xxx.xxx.xxx (35.xxx.xxx.xxx) 56(84) bytes of data.
64 bytes from 35.xxx.xxx.xxx: icmp_seq=1 ttl=54 time=63.4 ms
64 bytes from 35.xxx.xxx.xxx: icmp_seq=2 ttl=54 time=51.4 ms
Он доступен, но порт нет:
telnet 35.xxx.xxx.xxx 30680
Trying 35.xxx.xxx.xxx...
telnet: Unable to connect to remote host: Connection timed out
Это моя конфигурация yaml для кластера kafka:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
creationTimestamp: "2020-02-10T15:04:38Z"
generation: 1
name: kafka-cluster
namespace: xxxx-kafka
resourceVersion: "5868409"
selfLink: /apis/kafka.strimzi.io/v1beta1/namespaces/xxxx-kafka/kafkas/kafka-cluster
uid: 93d0d9b6-7c88-4e01-af9c-49f9fcaac1d1
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
config:
offsets.topic.replication.factor: 1
transaction.state.log.min.isr: 1
transaction.state.log.replication.factor: 1
listeners:
external:
tls: false
type: nodeport
plain: {}
tls: {}
replicas: 1
storage:
type: jbod
volumes:
- deleteClaim: false
id: 0
size: 100Gi
type: persistent-claim
zookeeper:
replicas: 1
storage:
deleteClaim: false
size: 100Gi
type: persistent-claim
status:
conditions:
- lastTransitionTime: 2020-02-11T15:33:50+0000
status: "True"
type: Ready
listeners:
- addresses:
- host: kafka-cluster-kafka-bootstrap.xxxx-kafka.svc
port: 9092
type: plain
- addresses:
- host: kafka-cluster-kafka-bootstrap.xxxx-kafka.svc
port: 9093
type: tls
- addresses:
- host: <AnyNodeAddress>
port: 30680
type: external
observedGeneration: 1
Кто-нибудь знает, почему возникает такая проблема и как правильно запустить производителя?
РЕДАКТИРОВАТЬ:
это ямл kafka-cluster-kafka-bootstrap
apiVersion: v1
kind: Service
metadata:
creationTimestamp: "2020-02-11T15:31:57Z"
labels:
app.kubernetes.io/instance: kafka-cluster
app.kubernetes.io/managed-by: strimzi-cluster-operator
app.kubernetes.io/name: strimzi
strimzi.io/cluster: kafka-cluster
strimzi.io/kind: Kafka
strimzi.io/name: kafka-cluster-kafka-bootstrap
name: kafka-cluster-kafka-bootstrap
namespace: xxxx-kafka
ownerReferences:
- apiVersion: kafka.strimzi.io/v1beta1
blockOwnerDeletion: false
controller: false
kind: Kafka
name: kafka-cluster
uid: 93d0d9b6-7c88-4e01-af9c-49f9fcaac1d1
resourceVersion: "5867902"
selfLink: /api/v1/namespaces/xxxx-kafka/services/kafka-cluster-kafka-bootstrap
uid: 3dfdbd89-fc97-43d3-8e10-4f6a23eae20e
spec:
clusterIP: 10.0.2.111
ports:
- name: replication
port: 9091
protocol: TCP
targetPort: 9091
- name: clients
port: 9092
protocol: TCP
targetPort: 9092
- name: clientstls
port: 9093
protocol: TCP
targetPort: 9093
selector:
strimzi.io/cluster: kafka-cluster
strimzi.io/kind: Kafka
strimzi.io/name: kafka-cluster-kafka
sessionAffinity: None
type: ClusterIP
status:
loadBalancer: {}
Я также пытался подключиться внутренне через узел GCP, но у меня та же проблема. Я извлек внутренние Ips:
kubectl get nodes --output=jsonpath='{range .items[*]}{.status.addresses[?(@.type=="InternalIP")].address}{"\n"}{end}'
10.132.0.4
10.132.0.5
Затем я попытался подключить производителя из узла GCP и вот что у меня получилось:
sh kafka-console-producer.sh --broker-list 10.132.0.4:30680 --topic test_topic
>hello
[2020-02-12 15:31:15,033] ERROR Error when sending message to topic test_topic with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Topic test_topic not present in metadata after 60000 ms.
>[2020-02-12 15:32:24,629] WARN [Producer clientId=console-producer] Connection to node 0 (/34.xxx.xxx.xxx:30350) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Теперь, прежде чем получить ошибку подключения, я получил еще одну:
[2020-02-12 15:31:15,033] ERROR Error when sending message to topic test_topic with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Topic test_topic not present in metadata after 60000 ms.
Это kafka-cluster-kafka-brokers
yaml:
apiVersion: v1
kind: Service
metadata:
annotations:
service.alpha.kubernetes.io/tolerate-unready-endpoints: "true"
creationTimestamp: "2020-02-11T15:31:57Z"
labels:
app.kubernetes.io/instance: kafka-cluster
app.kubernetes.io/managed-by: strimzi-cluster-operator
app.kubernetes.io/name: strimzi
strimzi.io/cluster: kafka-cluster
strimzi.io/kind: Kafka
strimzi.io/name: kafka-cluster-kafka-brokers
name: kafka-cluster-kafka-brokers
namespace: xxxx-kafka
ownerReferences:
- apiVersion: kafka.strimzi.io/v1beta1
blockOwnerDeletion: false
controller: false
kind: Kafka
name: kafka-cluster
uid: 93d0d9b6-7c88-4e01-af9c-49f9fcaac1d1
resourceVersion: "5867905"
selfLink: /api/v1/namespaces/xxxx-kafka/services/kafka-cluster-kafka-brokers
uid: 42d7fa08-cf52-47e1-9746-89a45d65351b
spec:
clusterIP: None
ports:
- name: replication
port: 9091
protocol: TCP
targetPort: 9091
- name: clients
port: 9092
protocol: TCP
targetPort: 9092
- name: clientstls
port: 9093
protocol: TCP
targetPort: 9093
publishNotReadyAddresses: true
selector:
strimzi.io/cluster: kafka-cluster
strimzi.io/kind: Kafka
strimzi.io/name: kafka-cluster-kafka
sessionAffinity: None
type: ClusterIP
status:
loadBalancer: {}