Apache Nifi для MS SQL CDC с использованием динамического SQL-запроса

В нашей унаследованной архитектуре у нас есть база данных сервера MS SQL, которая хранит всю информацию о датчиках практически в реальном времени, в среднем за секунду она получает 100 записей. Чтобы получить полную информацию о событиях датчиков, нам нужно присоединиться 2-3 таблицы в базе данных.

Sample Query:

SELECT SOMETHING
FROM TABLE1 AS tab1 
INNER JOIN TABLE2 AS tab2 ON tab1.UpdateID=tab2.ID 
INNER JOIN TABLE3 as tab3 ON tab1.TagID=tab3.ID 
WHERE tab2.UpdateTime > ${lastExtractUnixTime}

Наше требование состоит в том, чтобы получать данные захвата вышеупомянутого запроса каждую минуту и ​​отправлять записи в Kafka.

Временно я делаю CDC с использованием Spark Core JDBC, обрабатываю записи, отправляю в Kafka и поддерживаю информацию CDC вместе с ${lastExtractUnixTime} в HBase как таблица Феникса. Задание запланировано на каждый 1-минутный интервал.

Как долгосрочное решение, мы планируем использовать Apache Nifi для выполнения CDC и публиковать информацию в Kafka, Spark Streaming будет читать сообщения от Kafka, применять некоторую бизнес-логику и отправлять обогащенные данные в другую тему Kafka.; Я не нахожу подходящий процессор, который поможет мне динамически передавать ${lastExtractUnixTime} в SQL и получать дельта-записи каждые 1 или 2 минуты.

Пожалуйста, предложите, как это можно сделать с помощью Apache Nifi.

0 ответов

Другие вопросы по тегам