Можно ли использовать функции базы данных, такие как dateof(now()), в Kafka Connect Sink с KCQL?

Я пытаюсь сделать простой Раковина из темы Кафки, как INSERT INTO table SELECT * FROM topicно также добавьте столбец insert_timestamp со значением dateof(now())

Возможно ли использование таких функций базы данных в KCQL для коннектора приемника? Можно ли как-то написать в мою тему функцию dateof(now()) без этого просто пытаясь записать значение в виде строки в столбце? Или мне, вероятно, нужно написать собственный разъем приемника для поддержки этого? Я бы использовал значение по умолчанию для столбца, но, похоже, это не поддерживается Cassandra.

1 ответ

Я не знаком с KCQL. Однако для этого можно использовать преобразования отдельных сообщений, которые являются частью API Apache Kafka Connect. Например, если вам нужна метка времени сообщения Kafka, этот конфиг в вашем коннекторе должен сделать это:

"transforms":"InsertMsgTS",
"transforms.InsertMsgTS.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertMsgTS.timestamp.field":"message_ts",

Если вам нужно абсолютное время вставки (а не отметка времени сообщения Kafka), вам нужно написать собственное преобразование ( javadocs).

Другие вопросы по тегам