Невозможно сгенерировать правильную схему Avro и схему Confluent с использованием потока Spring Boot Cloud

Я извлекаю данные из темы кафки, используя Avro и Confluent Registry.

Я хочу привести эти данные к пользовательскому типу и записать их в другую тему, также используя реестр Avro и Confluent.

Отражение используется, но регистрируемая схема содержит только "байты", а полезная нагрузка не читается.

Пример: "\u001C52465676600010\u0000\u0002\n9003B&POPS PAULINE RENARD"

Потребитель работает отлично.

ReflectData.get().getSchema(MYCLASS.class) дает ожидаемый результат.

Application.yml

spring.cloud.stream.schemaRegistryClient.endpoint: http://147.210.5.141:8081
spring:
  jackson:
    parser:
        ALLOW_UNQUOTED_FIELD_NAMES: true
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://147.210.5.141:8081
      kafka:
        binder:
          brokers: 147.210.5.141:9092
          producer-properties:
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://147.210.5.141:8081
      bindings:
        structures-out:
          destination: es_structure_siret
          contentType: application/*+avro
          producer:
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://147.210.5.141:8081
            headerMode: raw
      schema:
         avro:
           dynamicSchemaGenerationEnabled: true




2019-07-15 10:23:49.191  INFO 13188 --- [container-0-C-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [147.210.5.141:9092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class io.confluent.kafka.serializers.KafkaAvroSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class io.confluent.kafka.serializers.KafkaAvroSerializer

2019-07-15 10:23:49.191  INFO 13188 --- [container-0-C-1] i.c.k.s.KafkaAvroSerializerConfig        : KafkaAvroSerializerConfig values: 
    schema.registry.url = [http://147.210.5.141:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

2019-07-15 10:23:49.191  INFO 13188 --- [container-0-C-1] i.c.k.s.KafkaAvroSerializerConfig        : KafkaAvroSerializerConfig values: 
    schema.registry.url = [http://147.210.5.141:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

0 ответов

Другие вопросы по тегам