MongoDB Kafka Sink Connector не обрабатывает процессор RenameByRegex
Мне нужно прослушивать события из Kafka Topic и Sink в коллекцию в MongoDB. Сообщение содержит вложенный объект со свойством id, как в примере выше.
{
"testId": 1,
"foo": "bar",
"foos": [{ "id":"aaaaqqqq-rrrrr" }]
}
Я пытаюсь переименовать этот вложенный идентификатор в _id с помощью RegExp
{
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "test",
"connection.uri": "mongodb://mongo:27017",
"database": "test_db",
"collection": "test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"value.projection.list":"testId",
"value.projection.type": "whitelist",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder, com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex",
"field.renamer.regexp": "[{\"regexp\":\"\b(id)\b\", \"pattern\":\"\b(id)\b\",\"replace\":\"_id\"}]"
}
И результат config/validate 500 Internal Server Error
, с этим сообщением:
{
"error_code": 500,
"message": null
}
Я что-то упустил или это проблема?
1 ответ
Я думаю, все, что вам нужно, это Kafka Connect Single Message Transform (SMT), а точнее ReplaceField
:
Отфильтруйте или переименуйте поля в структуре или карте.
Следующее заменит id
имя поля с _id
:
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "id:_id"
В вашем случае перед применением вышеупомянутой трансформации вы также можете захотеть Flatten
foos
:
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "."
и, наконец, примените преобразование для переименования поля:
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "foos.id:foos._id"