Как указать MongoSource (используя Kafka Connect), какой ключ сериализовать
Я использую источник mongo, чтобы прослушивать поток изменений mongo и помещать все события в kafka, но я задыхаюсь, чтобы найти способ извлечь "настоящий" ключ из события. Я попробовал преобразование, но это не сработало, и я получил ошибку:
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String
в источнике Mongo я нашел эту строку
что в основном подразумевает, что у него даже нет некоторой обработки ключей, вместо этого он ищет поле "_id" (которое не является идентификатором документа, это информация токена резюме)
вместо этого я хотел бы установить ключ для темы как "documentKey".
вот пример событий, которые получает коннектор:
{
"_id": {
"_data": "DSAD45543FFWEHTEY004....."
},
"operationType": "replace",
"clusterTime": {
"$timestamp": {
"t": 1446707990,
"i": 1
}
},
"fullDocument": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
},
...
},
"ns": {
"db": "somedb",
"coll": "somecol"
},
"documentKey": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
}
}
}
Я использовал следующую конфигурацию:
"transforms":"createKey",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"documentKey"
Я пробовал это с:
org.apache.kafka.connect.json.JsonConverter
а также StringConverter (хотя я не думаю, что это можно сделать со строкой)
org.apache.kafka.connect.storage.StringConverter
Есть ли способ извлечь ключ? Обратите внимание: схема отключена.
2 ответа
Это потому, что MongoDB Source Connector для Kafka еще не поддерживает его. Он должен поддерживать расширенный выбор ключа, начиная с версии 1.3.
Обратите внимание: схема отключена
В этом случае вы не можете использовать преобразование ValueToKey. Но даже если бы вы могли, это преобразование не поддерживает вложенные значения в полезную нагрузку, что в вашем случае будет чем-то вродеdocumentKey._id.$binary