Можно ли использовать функции базы данных, такие как dateof(now()), в Kafka Connect Sink с KCQL?
Я пытаюсь сделать простой Раковина из темы Кафки, как INSERT INTO table SELECT * FROM topic
но также добавьте столбец insert_timestamp со значением dateof(now())
Возможно ли использование таких функций базы данных в KCQL для коннектора приемника? Можно ли как-то написать в мою тему функцию dateof(now())
без этого просто пытаясь записать значение в виде строки в столбце? Или мне, вероятно, нужно написать собственный разъем приемника для поддержки этого? Я бы использовал значение по умолчанию для столбца, но, похоже, это не поддерживается Cassandra.
1 ответ
Я не знаком с KCQL. Однако для этого можно использовать преобразования отдельных сообщений, которые являются частью API Apache Kafka Connect. Например, если вам нужна метка времени сообщения Kafka, этот конфиг в вашем коннекторе должен сделать это:
"transforms":"InsertMsgTS",
"transforms.InsertMsgTS.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertMsgTS.timestamp.field":"message_ts",
Если вам нужно абсолютное время вставки (а не отметка времени сообщения Kafka), вам нужно написать собственное преобразование ( javadocs).