Как указать MongoSource (используя Kafka Connect), какой ключ сериализовать

Я использую источник mongo, чтобы прослушивать поток изменений mongo и помещать все события в kafka, но я задыхаюсь, чтобы найти способ извлечь "настоящий" ключ из события. Я попробовал преобразование, но это не сработало, и я получил ошибку:

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String

в источнике Mongo я нашел эту строку

что в основном подразумевает, что у него даже нет некоторой обработки ключей, вместо этого он ищет поле "_id" (которое не является идентификатором документа, это информация токена резюме)

вместо этого я хотел бы установить ключ для темы как "documentKey".

вот пример событий, которые получает коннектор:

{
 "_id": {
    "_data": "DSAD45543FFWEHTEY004....."
  },
  "operationType": "replace",
  "clusterTime": {
    "$timestamp": {
      "t": 1446707990,
      "i": 1
    }
  },
  "fullDocument": {
    "_id": {
      "$binary": "FxVFgHFRhrr/z+zUc/w==",
      "$type": "03"
    },
    ...
  },
  "ns": {
    "db": "somedb",
    "coll": "somecol"
  },
  "documentKey": {
    "_id": {
      "$binary": "FxVFgHFRhrr/z+zUc/w==",
      "$type": "03"
    }
  }
}

Я использовал следующую конфигурацию:

"transforms":"createKey",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"documentKey"

Я пробовал это с:

org.apache.kafka.connect.json.JsonConverter

а также StringConverter (хотя я не думаю, что это можно сделать со строкой)

org.apache.kafka.connect.storage.StringConverter

Есть ли способ извлечь ключ? Обратите внимание: схема отключена.

2 ответа

Это потому, что MongoDB Source Connector для Kafka еще не поддерживает его. Он должен поддерживать расширенный выбор ключа, начиная с версии 1.3.

https://jira.mongodb.org/browse/KAFKA-40

Обратите внимание: схема отключена

В этом случае вы не можете использовать преобразование ValueToKey. Но даже если бы вы могли, это преобразование не поддерживает вложенные значения в полезную нагрузку, что в вашем случае будет чем-то вродеdocumentKey._id.$binary

Другие вопросы по тегам