Kafka Connect экспортирует несколько типов событий из одной темы

Я пытаюсь использовать новую функцию ( https://www.confluent.io/blog/put-several-event-types-kafka-topic/), касающуюся хранения двух разных типов событий на одну и ту же тему. На самом деле я использую Confluent версии 4.1.0 и установить эти свойства ниже, чтобы это произошло

properties.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY,TopicRecordNameStrategy.class.getName());
properties.put("value.multi.type", true);

Данные записываются в тему без проблем и могут рассматриваться в приложении Kafka Streams как Generic Avro Records. Также в реестре схемы Kafka создаются две новые записи, по одной для каждого события, проводимого по этой конкретной теме.

Проблема, с которой я сталкиваюсь, заключается в том, что я не могу экспортировать эти данные из этой темы с помощью Kafka Connect. В простейшем случае, когда я использую File Sink Connector, как показано ниже

{
  "name": "sink-connector",
  "config": {
      "topics": "source-topic",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "tasks.max": 1,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schema.registry.url":"http://kafka-schema-registry:8081",
      "value.converter":"io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"http://kafka-schema-registry:8081",
      "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
      "file": "/tmp/sink-file.txt"
    }
}

Я получаю сообщение об ошибке от Connector, которое похоже на ошибку сериализации, основанную на AvroConverter, как показано здесь

org.apache.kafka.connect.errors.DataException: source-topic
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 2
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:296)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:125)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:236)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Обратите внимание, что в реестре схемы есть схема Avro с идентификатором 2 и другая с идентификатором схемы 3, которые описывают два события, размещенные в одной и той же теме. Те же проблемы возникают при использовании коннектора JDBC.

Итак, как мне справиться с этим делом, чтобы экспортировать данные из моего кластера Kafka во внешнюю систему. Я что-то упустил в своей конфигурации? Можно ли иметь тему с несколькими типами событий и экспортировать их через Kafka Connect?

1 ответ

Нашел решение. Мой код передавал ключ как String и значение как avro. Hive-sink при чтении попытался найти avro-схему ключа и не смог ее найти. Добавление свойства key.converter = org.apache.kafka.connect.storage.StringConverter key.converter.schema.registry.url = http://localhost:8081/ помогло решить проблему.

Другие вопросы по тегам