Spring-Kafka не может работать с Kafka-кластером

Я настроил 3 кафки кластера и пытаюсь использовать с spring-kafka. но когда я убиваю кафку, я не могу отправить другие сообщения в очередь.

Кафка версия 2.0.0 весна-кафка версия 2.0.1

kafka-topics.sh --describe --zookeeper = zoo1: 2181 print

KAFKA_SWARM_TEST  PartitionCount:1        ReplicationFactor:2     Configs:
        Topic: KAFKA_SWARM_TEST Partition: 0    Leader: 2       Replicas: 1,2   Isr: 2,1

конфиг весна-кафка

spring.kafka.bootstrap-servers="kafka2:9094,kafka1:9093"

лидер kafka2.when я убиваю kafka1. Лидер по-прежнему кафка1. но весной-кафку кину

 Connection to node 1 could not be established.Broker may not be available.
 Discovered group coordinator kafka1:9093

Похоже, что пружина-Kafka Connect просто использовать Kafka1;

мой код Java

    @GetMapping(path = "/send",produces = MediaType.APPLICATION_JSON_VALUE)
    public JsonNode send() throws JsonProcessingException {
        ObjectNode put = JsonNodeFactory.instance.objectNode().put("status", "success");
        String topic = "KAFKA_SWARM_TEST";
        val msg = MessageBuilder
                .withPayload(objectMapper.writeValueAsString(put))
                .setHeader(KafkaHeaders.TOPIC, topic)
                .build();
        kafkaTemplate.send(msg);
        return put;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("KAFKA_SWARM_TEST", 1, (short) 2);
    }

    @KafkaListener(groupId="#{T(java.util.UUID).randomUUID().toString()}",topics = "KAFKA_SWARM_TEST")
    void testGetInfo(String message) throws IOException {
        log.error("getMessage: =====> " + message);
    }

конфиг кафки

version: '3.7'

services:

  zoo1:
    image: wurstmeister/zookeeper
    restart: always
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888
  zoo2:
    image: wurstmeister/zookeeper
    restart: always
    ports:
      - 2180:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888


  kafka1:
    image: wurstmeister/kafka
    restart: always
    ports:
    - "9093:9093"
    depends_on:
      - zoo1
      - zoo2
    privileged: true
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
      KAFKA_LOG_DIRS: /kafka
      KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
      KAFKA_SSL_KEY_PASSWORD: ksstone430
      KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
      KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
      KAFKA_SSL_CLIENT_AUTH: required
      LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
    volumes:
      - ./kafka_broker_cert:/kafka_broker_cert
      - /var/run/docker.sock:/var/run/docker.sock

  kafka2:
    image: wurstmeister/kafka
    restart: always
    ports:
    - "9094:9093"
    depends_on:
      - zoo1
      - zoo2
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
      KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
      KAFKA_LOG_DIRS: /kafka
      KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
      KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
      KAFKA_SSL_KEY_PASSWORD: ksstone430
      KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
      KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
      KAFKA_SSL_CLIENT_AUTH: required
      LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
    volumes:
      - ./kafka_broker_cert:/kafka_broker_cert
      - /var/run/docker.sock:/var/run/docker.sock

1 ответ

Попробуйте проверить, работает ли выбор нового лидера кластера Кафки, когда вы убиваете один из узлов (например, лидер, кафка1)

Кроме того, проверьте, есть ли другие конфигурации, которые переопределяют spring.kafka.bootstrap-servers, Может быть боб, который просто указывает на kafka1:9093 как брокер.

Однако даже если bootstrap-servers свойство указывает на kafka1:9093 только потребитель должен найти другие узлы брокера в случае настройки узла.

Другие вопросы по тегам