Тема Kafka и тема реестра схемы
У меня есть вопрос о настройке потокового процессора с Kafka и разными именами темы (брокер Kafka) и темы (Schema Registry).
spring:
cloud:
schema-registry-client:
endpoint: http://localhost:8081
cached: true
stream:
function:
definition: process
default:
consumer:
use-native-decoding: true
producer:
use-native-encoding: true
header-mode: none
bindings:
process-in-0:
group: spring-boot-kafka
destination: abc.bla
consumer:
max-attempts: 3
process-out-0:
destination: def.bla
kafka:
binder:
auto-add-partitions: false
auto-create-topics: false
consumer-properties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
schema.registry.url: http://localhost:8081
allow.auto.create.topics: false
auto.register.schemas: false
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
auto.register.schemas: false
brokers:
- localhost:9092
configuration:
allow.auto.create.topics: false
auto.register.schemas: false
application.id: "${spring.application.name}"
Сначала кажется, что все работает нормально с брокером Kafka и реестром схемы, но если процессор получает событие, начинается волшебство реестра схемы.
Вместо отправки abc в качестве темы в реестр схем будет отправлено abc.bla. Реестр схемы отвечает, что не найдено.
Ожидается:localhost:8081 / subject / abc / versions Неожиданно и неправильно:localhost:8081 / subject / abc.bla / versions
error_code 40401
message "Subject not found."
Интересно, что не так, потому что один производитель или клиент-потребитель, похоже, может распознать правильное имя субъекта из темы без явной настройки.
Вот код процессора:
@SpringBootApplication
@EnableSchemaRegistryClient
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Function<ABC, DEF> process() {
return Transformer::transform;
}
}
Кто-нибудь знает, как я могу настроить
io.confluent.kafka.serializers.KafkaAvroDeserializer
или
io.confluent.kafka.serializers.KafkaAvroSerializer
верный?
Большое спасибо, Маркус
1 ответ
Извините, теперь я могу ответить на свой вопрос.
Причиной была внутренняя TopicNameStrategy компании.
subject:
name:
strategy: CompanyInternalStrategy
После этого проблема с темой/темой была исправлена. Подсказка, которую я нашел здесь