Kafka Connect не работает с предметными стратегиями
контекст
Я кодировал пару маленьких разъемов Kafka Connect. Тот, который просто генерирует случайные данные каждую секунду, а другой, который регистрирует их в консоли. Они интегрированы с реестром схемы, поэтому данные сериализуются с помощью Avro.
Я развернул их в локальной среде Kafka, используя образ Docker fast-data-dev, предоставленный Landoop
Базовая настройка работает и выдает сообщение каждую секунду, которая регистрируется
Тем не менее, я хочу изменить стратегию имени субъекта. По умолчанию генерируется два предмета:
${topic}-key
${topic}-value
Согласно моему сценарию использования, мне нужно будет генерировать события с разными схемами, которые будут заканчиваться в той же теме. Поэтому названия предметов, которые мне нужны:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
Согласно документации мои потребности вписываются в TopicRecordNameStrategy
Что я пробовал
Я создаю avroData
Объект для отправки значений для подключения:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
и использовать его впоследствии для создания SourceRecord
объекты ответа
В документации говорится, что для использования реестра схем в Kafka Connect необходимо установить некоторые свойства в конфигурации коннектора. Поэтому, когда я создаю его, я добавляю их:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
проблема
Разъем, кажется, игнорирует эти свойства и продолжает использовать старый ${topic}-key
а также ${topic}-value
предметы.
Вопрос
Предполагается, что Kafka Connect поддерживает разные предметные стратегии. Мне удалось обойти проблему, написав свою собственную версию AvroConverter
и жестко закодировать, что предметная стратегия - это то, что мне нужно. Однако это не выглядит хорошим подходом и также вызывает проблемы при попытке использовать данные с помощью Sink Kafka Connector. Я продублировал тему, так что есть версия со старым именем (${topic}-key
) и это работает
Как правильно настроить указанную стратегию для Kafka Connect?
1 ответ
Вы скучаете по key.converter
а также value.converter
префикс для передачи конфигурации в конвектор. Так что вместо:
key.subject.name.strategy
value.subject.name.strategy
ты хочешь:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
Источник https://docs.confluent.io/current/connect/managing/configuring.html:
Чтобы передать параметры конфигурации преобразователям ключей и значений, добавьте к ним префикс
key.converter.
или жеvalue.converter.
как в рабочей конфигурации при определении конвертеров по умолчанию. Обратите внимание, что они используются только в том случае, если соответствующая конфигурация преобразователя указана вkey.converter
или жеvalue.converter
свойства.