ошибка схемы производителя confluent kafka avro

Я использую пример кода из https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py для загрузки данных в тему. Я сделал только одно изменение, и я добавил "default": null в каждое поле для совместимости схемы. Он загружается нормально, поскольку я вижу сообщение и схему в http://localhost:9021/. Я также могу видеть данные, поступающие в тему, если я запускаю команду kafka-avro-console-consumer через cli. Но пытаясь использовать приемник с красным смещением с конфигурацией, указанной в https://docs.confluent.io/current/connect/kafka-connect-aws-redshift/index.html, я получаю следующую ошибку, как показано ниже. Однако, если я не добавлю "default": null в поля, то все будет нормально. Любое руководство будет очень признательно.

org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1812)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1567)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1543)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1226)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:108)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:491)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "null", schema type: INT32
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)

0 ответов

Недостаточно добавить "default: null", вам нужно изменить тип, чтобы он был примерно таким:

type: ["null", "string"], default: null

позаботившись добавить "null" к типу в первой позиции, т.е. не

type: ["string", "null"], default: null

См. Обсуждение по адресу: http://apache-avro.679487.n3.nabble.com/How-to-declare-an-optional-field-tp4025089p4025094.html