Тема Kafka и тема реестра схемы

У меня есть вопрос о настройке потокового процессора с Kafka и разными именами темы (брокер Kafka) и темы (Schema Registry).

      spring:
  cloud:
    schema-registry-client:
      endpoint: http://localhost:8081
      cached: true
    stream:
      function:
        definition: process
      default:
        consumer:
          use-native-decoding: true
        producer:
          use-native-encoding: true
          header-mode: none
      bindings:
        process-in-0:
          group: spring-boot-kafka
          destination: abc.bla
          consumer:
            max-attempts: 3
        process-out-0:
          destination: def.bla
      kafka:
        binder:
          auto-add-partitions: false
          auto-create-topics: false
          consumer-properties:
            key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
            schema.registry.url: http://localhost:8081
           allow.auto.create.topics: false
           auto.register.schemas: false
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://localhost:8081
            auto.register.schemas: false
          brokers:
            - localhost:9092
          configuration:
            allow.auto.create.topics: false
            auto.register.schemas: false
            application.id: "${spring.application.name}"

Сначала кажется, что все работает нормально с брокером Kafka и реестром схемы, но если процессор получает событие, начинается волшебство реестра схемы.

Вместо отправки abc в качестве темы в реестр схем будет отправлено abc.bla. Реестр схемы отвечает, что не найдено.

Ожидается:localhost:8081 / subject / abc / versions Неожиданно и неправильно:localhost:8081 / subject / abc.bla / versions

      error_code  40401
message "Subject not found."

Интересно, что не так, потому что один производитель или клиент-потребитель, похоже, может распознать правильное имя субъекта из темы без явной настройки.

Вот код процессора:

      @SpringBootApplication
@EnableSchemaRegistryClient
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public Function<ABC, DEF> process() {
        return Transformer::transform;
    }
}

Кто-нибудь знает, как я могу настроить io.confluent.kafka.serializers.KafkaAvroDeserializer или io.confluent.kafka.serializers.KafkaAvroSerializer верный?

Большое спасибо, Маркус

1 ответ

Извините, теперь я могу ответить на свой вопрос.

Причиной была внутренняя TopicNameStrategy компании.

      subject:
  name:
    strategy: CompanyInternalStrategy

После этого проблема с темой/темой была исправлена. Подсказка, которую я нашел здесь

Другие вопросы по тегам