Confluent Cloud Apache Kafka Consumer - Тема (и) [test-1] присутствует / отсутствует и отсутствуетTopicsFatal верно
Я новичок, пытающийся наладить взаимодействие между двумя микросервисами Spring Boot с помощью Confluent Cloud Apache Kafka.
При использовании Kafka в Confluent Cloud я получаю следующую ошибку на моем потребителе (ServiceB) после того, как ServiceA публикует сообщение в теме. Однако, когда я вхожу в свое Confluent Cloud, я вижу, что сообщение было успешно опубликовано в теме.
org.springframework.context.ApplicationContextException: Failed to start bean
'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is
java.lang.IllegalStateException: Topic(s) [topic-1] is/are not present and
missingTopicsFatal is true
Я не сталкиваюсь с этой проблемой, когда запускаю Kafka на моем локальном сервере. ServiceA может опубликовать сообщение в теме на моем локальном сервере Kafka, а ServiceB успешно может использовать это сообщение.
Я упомянул конфигурацию моего локального сервера Kafka в application.properties(как закомментированный код)
Услуга A: ПРОИЗВОДИТЕЛЬ
application.properties
app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"
#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
Sender.java
public class Sender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.topic}")
private String topic;
public void send(String data){
Message<String> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, topic)
.build();
kafkaTemplate.send(message);
}
}
KafkaProducerConfig.java
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${bootstrap.servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}
Услуга B: ПОТРЕБИТЕЛЬ
application.properties
app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"
#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
KafkaConsumerConfig.java
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${bootstrap.servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "confluent_cli_consumer_040e5c14-0c18-4ae6-a10f-8c3ff69cbc1a"); // confluent cloud consumer group-id
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory(
consumerConfigs(),
new StringDeserializer(), new StringDeserializer());
}
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaConsumer.java
@Service
public class KafkaConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaListener.class);
@Value("{app.topic}")
private String kafkaTopic;
@KafkaListener(topics = "${app.topic}", containerFactory = "kafkaListenerContainerFactory")
public void receive(@Payload String data) {
LOG.info("received data='{}'", data);
}
}
3 ответа
@cricket_007 ответ правильный. Вам необходимо встроить имя пользователя и пароль (в частности, ключ API кластера и секрет API) в значение свойства sasl.jaas.config.
Вы можете дважды проверить, как клиенты Java должны подключаться к Confluent Cloud, с помощью этого официального примера здесь: https://github.com/confluentinc/examples/blob/5.3.1-post/clients/cloud/java/src/main/java/io/confluent/examples/clients/cloud
Спасибо,
- Рикардо
Имя пользователя и пароль являются частью конфигурации JAAS, поэтому поместите их в одну строку
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkaclient1" password="kafkaclient1-secret";
Я также предлагаю вам убедиться, что ваш файл свойств правильно загружен в клиент.
Вы не можете просто поместить произвольные свойства kafka непосредственно в файл application.properties.
Свойства, поддерживаемые автоматической конфигурацией, показаны в appendix-application-properties.html. Обратите внимание, что по большей части эти свойства (с дефисом или camelCase) отображаются непосредственно на свойства Apache Kafka с точками. За подробностями обращайтесь к документации Apache Kafka.
Первые несколько из этих свойств применяются ко всем компонентам (производителям, потребителям, администраторам и потокам), но могут быть указаны на уровне компонентов, если вы хотите использовать другие значения. Apache Kafka обозначает свойства с важностью HIGH, MEDIUM или LOW. Автоконфигурация Spring Boot поддерживает все свойства ВЫСОКОЙ важности, некоторые выбранные свойства MEDIUM и LOW, а также любые свойства, не имеющие значения по умолчанию.
Только часть свойств, поддерживаемых Kafka, доступна напрямую через класс KafkaProperties. Если вы хотите настроить производителя или потребителя с дополнительными свойствами, которые напрямую не поддерживаются, используйте следующие свойства:
spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth
Это устанавливает общее свойство prop.one Kafka на первое (применяется к производителям, потребителям и администраторам), свойство prop.two admin на второе, потребительское свойство prop.three на третье, свойство производителя prop.four на четвертое, а свойство prop..five свойство потоков к пятому.
...