Spring-Kafka не может работать с Kafka-кластером
Я настроил 3 кафки кластера и пытаюсь использовать с spring-kafka. но когда я убиваю кафку, я не могу отправить другие сообщения в очередь.
Кафка версия 2.0.0 весна-кафка версия 2.0.1
kafka-topics.sh --describe --zookeeper = zoo1: 2181 print
KAFKA_SWARM_TEST PartitionCount:1 ReplicationFactor:2 Configs:
Topic: KAFKA_SWARM_TEST Partition: 0 Leader: 2 Replicas: 1,2 Isr: 2,1
конфиг весна-кафка
spring.kafka.bootstrap-servers="kafka2:9094,kafka1:9093"
лидер kafka2.when я убиваю kafka1. Лидер по-прежнему кафка1. но весной-кафку кину
Connection to node 1 could not be established.Broker may not be available.
Discovered group coordinator kafka1:9093
Похоже, что пружина-Kafka Connect просто использовать Kafka1;
мой код Java
@GetMapping(path = "/send",produces = MediaType.APPLICATION_JSON_VALUE)
public JsonNode send() throws JsonProcessingException {
ObjectNode put = JsonNodeFactory.instance.objectNode().put("status", "success");
String topic = "KAFKA_SWARM_TEST";
val msg = MessageBuilder
.withPayload(objectMapper.writeValueAsString(put))
.setHeader(KafkaHeaders.TOPIC, topic)
.build();
kafkaTemplate.send(msg);
return put;
}
@Bean
public NewTopic topic() {
return new NewTopic("KAFKA_SWARM_TEST", 1, (short) 2);
}
@KafkaListener(groupId="#{T(java.util.UUID).randomUUID().toString()}",topics = "KAFKA_SWARM_TEST")
void testGetInfo(String message) throws IOException {
log.error("getMessage: =====> " + message);
}
конфиг кафки
version: '3.7'
services:
zoo1:
image: wurstmeister/zookeeper
restart: always
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888
zoo2:
image: wurstmeister/zookeeper
restart: always
ports:
- 2180:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888
kafka1:
image: wurstmeister/kafka
restart: always
ports:
- "9093:9093"
depends_on:
- zoo1
- zoo2
privileged: true
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
KAFKA_LOG_DIRS: /kafka
KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
KAFKA_SSL_KEY_PASSWORD: ksstone430
KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
KAFKA_SSL_CLIENT_AUTH: required
LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
volumes:
- ./kafka_broker_cert:/kafka_broker_cert
- /var/run/docker.sock:/var/run/docker.sock
kafka2:
image: wurstmeister/kafka
restart: always
ports:
- "9094:9093"
depends_on:
- zoo1
- zoo2
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: $KAFKA_ADVERTISED_HOST_NAME
KAFKA_ZOOKEEPER_CONNECT: zoo1:2181,zoo2:2181
KAFKA_LOG_DIRS: /kafka
KAFKA_SSL_KEYSTORE_LOCATION: /kafka_broker_cert/server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: ksstone430
KAFKA_SSL_KEY_PASSWORD: ksstone430
KAFKA_SSL_TRUSTSTORE_LOCATION: /kafka_broker_cert/server.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD: stsstone430
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://:9092,SSL://$KAFKA_ADVERTISED_HOST_NAME:9093"
KAFKA_SSL_CLIENT_AUTH: required
LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 60
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "null"
volumes:
- ./kafka_broker_cert:/kafka_broker_cert
- /var/run/docker.sock:/var/run/docker.sock
1 ответ
Попробуйте проверить, работает ли выбор нового лидера кластера Кафки, когда вы убиваете один из узлов (например, лидер, кафка1)
Кроме того, проверьте, есть ли другие конфигурации, которые переопределяют spring.kafka.bootstrap-servers
, Может быть боб, который просто указывает на kafka1:9093
как брокер.
Однако даже если bootstrap-servers
свойство указывает на kafka1:9093
только потребитель должен найти другие узлы брокера в случае настройки узла.