Как изменить название темы, созданной коннектором Kafka Connect Source Connector
У меня уже запущено производство с развернутым Kafka-Cluster и с темой " existing-topic". Я использую MongoDB-Source-Connector от Debezium.
Здесь все, что я хочу, - это направить события CDC непосредственно в тему " existing-topic ", чтобы мои потребители, которые уже слушают эту тему, ее обработали.
Я не нашел ресурсов, чтобы сделать это, однако упоминается, что тема создана в формате ниже -
"Если ваш параметр mongodb.name - A, имя базы данных - B, а имя коллекции - C, данные из базы данных A и коллекции C будут загружены в тему ABC"
Могу ли я изменить тему на "существующую-тему" и перенести в нее события?
2 ответа
Согласно документации,
Название тем Кафки всегда имеет вид
logicalName.databaseName.collectionName
, гдеlogicalName
- логическое имя соединителя, указанное вmongodb.name
свойство конфигурации,databaseName
- имя базы данных, в которой произошла операция, иcollectionName
- это имя коллекции MongoDB, в которой существовал затронутый документ.
Это означает, что если логическое имя вашего коннектора myConnector
и ваша база данных myDatabase
имеет две коллекции users
а также orders
{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo-db-host:27017",
"mongodb.name": "myDatabase",
"collection.whitelist": "myDatabase[.]*",
}
}
тогда Kafka Connect заполнит две темы именами:
myConnector.myDatabase.users
myConnector.myDatabase.orders
Теперь, если вы все еще хотите изменить имя целевой темы, вы можете использовать Kafka Connect Single Message Transforms (SMT). Точнее, ExtractTopic
должен вам помочь. Обратите внимание, что этот SMT помогает вам извлечь имя темы из ключа или значения сообщения, поэтому вам нужно каким-то образом включить желаемое имя темы в полезные данные.
Например, следующий SMT извлечет значение поля myField
и используйте это как тему записи:
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=myField
Я столкнулся с той же проблемой с коннектором источника JDBC и нашел другое решение:
С использованием RegexRouter
Преобразование одного сообщения с dropPrefix
вы можете просто переопределить все название темы:
"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"A.B.C", // whole created topic name
"transforms.dropPrefix.replacement":"existing-topic" // whole exisiting topic name
И он работает с регулярным выражением, поэтому, если вы используете несколько таблиц / коллекций и имя созданной темы не является постоянным, вы должны иметь возможность сделать его динамическим.
Это немного взломано, поскольку технически я отбрасываю все название темы, а затем добавляю новое название темы - в любом случае, для меня это не лучшее решение.