Kafka Connect не работает с предметными стратегиями

контекст

Я кодировал пару маленьких разъемов Kafka Connect. Тот, который просто генерирует случайные данные каждую секунду, а другой, который регистрирует их в консоли. Они интегрированы с реестром схемы, поэтому данные сериализуются с помощью Avro.

Я развернул их в локальной среде Kafka, используя образ Docker fast-data-dev, предоставленный Landoop

Базовая настройка работает и выдает сообщение каждую секунду, которая регистрируется

Тем не менее, я хочу изменить стратегию имени субъекта. По умолчанию генерируется два предмета:

  • ${topic}-key
  • ${topic}-value

Согласно моему сценарию использования, мне нужно будет генерировать события с разными схемами, которые будут заканчиваться в той же теме. Поэтому названия предметов, которые мне нужны:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

Согласно документации мои потребности вписываются в TopicRecordNameStrategy

Что я пробовал

Я создаю avroData Объект для отправки значений для подключения:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

и использовать его впоследствии для создания SourceRecord объекты ответа

В документации говорится, что для использования реестра схем в Kafka Connect необходимо установить некоторые свойства в конфигурации коннектора. Поэтому, когда я создаю его, я добавляю их:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

проблема

Разъем, кажется, игнорирует эти свойства и продолжает использовать старый ${topic}-key а также ${topic}-value предметы.

Вопрос

Предполагается, что Kafka Connect поддерживает разные предметные стратегии. Мне удалось обойти проблему, написав свою собственную версию AvroConverter и жестко закодировать, что предметная стратегия - это то, что мне нужно. Однако это не выглядит хорошим подходом и также вызывает проблемы при попытке использовать данные с помощью Sink Kafka Connector. Я продублировал тему, так что есть версия со старым именем (${topic}-key) и это работает

Как правильно настроить указанную стратегию для Kafka Connect?

1 ответ

Решение

Вы скучаете по key.converter а также value.converter префикс для передачи конфигурации в конвектор. Так что вместо:

key.subject.name.strategy
value.subject.name.strategy

ты хочешь:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

Источник https://docs.confluent.io/current/connect/managing/configuring.html:

Чтобы передать параметры конфигурации преобразователям ключей и значений, добавьте к ним префикс key.converter. или же value.converter. как в рабочей конфигурации при определении конвертеров по умолчанию. Обратите внимание, что они используются только в том случае, если соответствующая конфигурация преобразователя указана в key.converter или же value.converter свойства.

Другие вопросы по тегам