Как я могу передать атрибут flowfile в службу контроллера?
Привет, Интернет, Улей, Ум!
Мне нужно запросить AWS Athena с помощью nifi, однако мне нужно поменять промежуточную директорию (корзину S3 и папку, в которой будут сохранены результаты) для каждого отправленного запроса.
Но свойство s3_staging_dir должно быть установлено в службе контроллера DBCPConnectionPool. Как я могу изменить значение этого свойства для каждого отдельного файла потока? Очевидно это не может быть получено одним языком выражения.
Спасибо!
2 ответа
Я не уверен в характере вашего потока, где каждый запрос зависит от отдельной промежуточной директории, но есть несколько вещей, которые нужно иметь в виду.
DBCPConnectionPool
Служба контроллера допускает динамические свойства, которые оценивают язык выражений, но эта оценка языка выражений выполняется, когда служба контроллера включена, поэтому "один раз" на запуск / останов.- Динамические свойства в службе контроллера не оценивают атрибуты потокового файла.
От Apache NiFi DBCPConnectionPool
документация:
Динамические свойства:
Динамические свойства позволяют пользователю указывать как имя, так и значение свойства.
...
Задает имя свойства и значение, которое должно быть установлено в соединениях JDBC. Если используется язык выражений, оценка будет выполняться после включения службы контроллера. Обратите внимание, что для этих свойств не доступен ввод файла потока (например, атрибуты) для использования в конструкциях языка выражений.Поддерживает язык выражений: true
Из-за вашего требования, что промежуточный каталог S3 отличается при каждом запросе, я думаю, что в этом случае вам нужно будет воспользоваться одним из следующих вариантов:
- Подайте Jira с просьбой поддержки родной Athena в NiFi (подробно объясните, почему существующий
DBCPConnectionPool
не поддерживает ваш вариант использования) - Расширить
DBCPConnectionPool
служба контроллера с вашим собственнымAthenaConnectionPool
диспетчер службы. Существует множество учебных пособий по созданию собственных компонентов NiFi, но Руководство разработчика NiFi> Разработка служб контроллеров - лучшее место для начала. Вы можете создать службу контроллера, которая будет оценивать входящие атрибуты потокового файла при выполнении выполнения языка выражений, но вам нужно будет запустить это вручную, так как службы контроллера не имеют@OnTrigger
фаза их жизненного цикла. Если вы также пишете пользовательский процессор, вы можете вызвать некоторый метод "переоценки" в службе контроллера изonTrigger()
метод процессора, но существующие процессоры не будут называть это. Вместо этого вы могли бы теоретически поставить высокочастотный освежитель в саму службу контроллера, используя исполнителей, но это определенно повлияет на производительность - Создать несколько
DBCPConnectionPool
экземпляры и процессоры SQL для каждого промежуточного каталога (выполнимо порядка 1 - 3, в противном случае ужасно) - Использовать
ExecuteStreamCommand
процессор сawscli
выполнять запросы с помощью инструмента командной строки. Это лишает вас собственных инструментов SQL NiFi, но позволяет настраивать запросы при каждом вызове, потому чтоExecuteStreamCommand
может интерпретировать специфичные для потока файлы атрибуты и использовать их в запросе - Переоцените свой дизайн потока и посмотрите, есть ли способ выполнить запросы, не допуская произвольных промежуточных каталогов S3 при выполнении отдельных запросов
Вам не нужно устанавливать свойство в DBCPConnectionPool
, Запрос, который вы задаете в процессоре SQL, будет выводить результаты из Афины в виде потоковых файлов. Вы можете подключить процессор SQL к PutS3Object
и укажите имя корзины и другие необходимые свойства. Это запишет результат вашего запроса SQL в промежуточный каталог S3.