Confluent Cloud Apache Kafka Consumer - Тема (и) [test-1] присутствует / отсутствует и отсутствуетTopicsFatal верно

Я новичок, пытающийся наладить взаимодействие между двумя микросервисами Spring Boot с помощью Confluent Cloud Apache Kafka.

При использовании Kafka в Confluent Cloud я получаю следующую ошибку на моем потребителе (ServiceB) после того, как ServiceA публикует сообщение в теме. Однако, когда я вхожу в свое Confluent Cloud, я вижу, что сообщение было успешно опубликовано в теме.

 org.springframework.context.ApplicationContextException: Failed to start bean 
'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is 
 java.lang.IllegalStateException: Topic(s) [topic-1] is/are not present and 
 missingTopicsFatal is true 

Я не сталкиваюсь с этой проблемой, когда запускаю Kafka на моем локальном сервере. ServiceA может опубликовать сообщение в теме на моем локальном сервере Kafka, а ServiceB успешно может использовать это сообщение.

Я упомянул конфигурацию моего локального сервера Kafka в application.properties(как закомментированный код)

Услуга A: ПРОИЗВОДИТЕЛЬ

application.properties

app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

Sender.java

public class Sender {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Value("${app.topic}")
private String topic;

public void send(String data){
    Message<String> message = MessageBuilder
            .withPayload(data)
            .setHeader(KafkaHeaders.TOPIC, topic)
            .build();
    kafkaTemplate.send(message);
  }
}

KafkaProducerConfig.java

@Configuration
@EnableKafka
public class KafkaProducerConfig {

@Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
 }

}

Услуга B: ПОТРЕБИТЕЛЬ

application.properties

app.topic=test-1
#Remote
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-4kgmg.us-west-2.aws.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
requiredusername="*******"
password="****"

#Local
#ssl.endpoint.identification.algorithm=https
#security.protocol=SASL_SSL
#sasl.mechanism=PLAIN
#request.timeout.ms=20000
#bootstrap.servers=localhost:9092
#retry.backoff.ms=500
#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

KafkaConsumerConfig.java

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
  @Value("${bootstrap.servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "confluent_cli_consumer_040e5c14-0c18-4ae6-a10f-8c3ff69cbc1a"); // confluent cloud consumer group-id
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory(
            consumerConfigs(),
            new StringDeserializer(), new StringDeserializer());
}

@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
 }
}

KafkaConsumer.java

@Service
public class KafkaConsumer {
private static final Logger LOG = LoggerFactory.getLogger(KafkaListener.class);

@Value("{app.topic}")
private String kafkaTopic;

  @KafkaListener(topics = "${app.topic}", containerFactory = "kafkaListenerContainerFactory")
  public void receive(@Payload String data) {
    LOG.info("received data='{}'", data);
  }
}

3 ответа

@cricket_007 ответ правильный. Вам необходимо встроить имя пользователя и пароль (в частности, ключ API кластера и секрет API) в значение свойства sasl.jaas.config.

Вы можете дважды проверить, как клиенты Java должны подключаться к Confluent Cloud, с помощью этого официального примера здесь: https://github.com/confluentinc/examples/blob/5.3.1-post/clients/cloud/java/src/main/java/io/confluent/examples/clients/cloud

Спасибо,

- Рикардо

Имя пользователя и пароль являются частью конфигурации JAAS, поэтому поместите их в одну строку

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkaclient1" password="kafkaclient1-secret";

Я также предлагаю вам убедиться, что ваш файл свойств правильно загружен в клиент.

См. Загрузочную документацию.

Вы не можете просто поместить произвольные свойства kafka непосредственно в файл application.properties.

Свойства, поддерживаемые автоматической конфигурацией, показаны в appendix-application-properties.html. Обратите внимание, что по большей части эти свойства (с дефисом или camelCase) отображаются непосредственно на свойства Apache Kafka с точками. За подробностями обращайтесь к документации Apache Kafka.

Первые несколько из этих свойств применяются ко всем компонентам (производителям, потребителям, администраторам и потокам), но могут быть указаны на уровне компонентов, если вы хотите использовать другие значения. Apache Kafka обозначает свойства с важностью HIGH, MEDIUM или LOW. Автоконфигурация Spring Boot поддерживает все свойства ВЫСОКОЙ важности, некоторые выбранные свойства MEDIUM и LOW, а также любые свойства, не имеющие значения по умолчанию.

Только часть свойств, поддерживаемых Kafka, доступна напрямую через класс KafkaProperties. Если вы хотите настроить производителя или потребителя с дополнительными свойствами, которые напрямую не поддерживаются, используйте следующие свойства:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

Это устанавливает общее свойство prop.one Kafka на первое (применяется к производителям, потребителям и администраторам), свойство prop.two admin на второе, потребительское свойство prop.three на третье, свойство производителя prop.four на четвертое, а свойство prop..five свойство потоков к пятому.

...

Другие вопросы по тегам