Как я могу передать атрибут flowfile в службу контроллера?

Привет, Интернет, Улей, Ум!

Мне нужно запросить AWS Athena с помощью nifi, однако мне нужно поменять промежуточную директорию (корзину S3 и папку, в которой будут сохранены результаты) для каждого отправленного запроса.

Но свойство s3_staging_dir должно быть установлено в службе контроллера DBCPConnectionPool. Как я могу изменить значение этого свойства для каждого отдельного файла потока? Очевидно это не может быть получено одним языком выражения.

Спасибо!

2 ответа

Решение

Я не уверен в характере вашего потока, где каждый запрос зависит от отдельной промежуточной директории, но есть несколько вещей, которые нужно иметь в виду.

  1. DBCPConnectionPool Служба контроллера допускает динамические свойства, которые оценивают язык выражений, но эта оценка языка выражений выполняется, когда служба контроллера включена, поэтому "один раз" на запуск / останов.
  2. Динамические свойства в службе контроллера не оценивают атрибуты потокового файла.

От Apache NiFi DBCPConnectionPool документация:

Динамические свойства:

Динамические свойства позволяют пользователю указывать как имя, так и значение свойства.

...

Задает имя свойства и значение, которое должно быть установлено в соединениях JDBC. Если используется язык выражений, оценка будет выполняться после включения службы контроллера. Обратите внимание, что для этих свойств не доступен ввод файла потока (например, атрибуты) для использования в конструкциях языка выражений.Поддерживает язык выражений: true

Из-за вашего требования, что промежуточный каталог S3 отличается при каждом запросе, я думаю, что в этом случае вам нужно будет воспользоваться одним из следующих вариантов:

  1. Подайте Jira с просьбой поддержки родной Athena в NiFi (подробно объясните, почему существующийDBCPConnectionPoolне поддерживает ваш вариант использования)
  2. Расширить DBCPConnectionPoolслужба контроллера с вашим собственнымAthenaConnectionPool диспетчер службы. Существует множество учебных пособий по созданию собственных компонентов NiFi, но Руководство разработчика NiFi> Разработка служб контроллеров - лучшее место для начала. Вы можете создать службу контроллера, которая будет оценивать входящие атрибуты потокового файла при выполнении выполнения языка выражений, но вам нужно будет запустить это вручную, так как службы контроллера не имеют @OnTriggerфаза их жизненного цикла. Если вы также пишете пользовательский процессор, вы можете вызвать некоторый метод "переоценки" в службе контроллера изonTrigger() метод процессора, но существующие процессоры не будут называть это. Вместо этого вы могли бы теоретически поставить высокочастотный освежитель в саму службу контроллера, используя исполнителей, но это определенно повлияет на производительность
  3. Создать несколькоDBCPConnectionPoolэкземпляры и процессоры SQL для каждого промежуточного каталога (выполнимо порядка 1 - 3, в противном случае ужасно)
  4. ИспользоватьExecuteStreamCommandпроцессор сawscli выполнять запросы с помощью инструмента командной строки. Это лишает вас собственных инструментов SQL NiFi, но позволяет настраивать запросы при каждом вызове, потому что ExecuteStreamCommand может интерпретировать специфичные для потока файлы атрибуты и использовать их в запросе
  5. Переоцените свой дизайн потока и посмотрите, есть ли способ выполнить запросы, не допуская произвольных промежуточных каталогов S3 при выполнении отдельных запросов

Вам не нужно устанавливать свойство в DBCPConnectionPool, Запрос, который вы задаете в процессоре SQL, будет выводить результаты из Афины в виде потоковых файлов. Вы можете подключить процессор SQL к PutS3Object и укажите имя корзины и другие необходимые свойства. Это запишет результат вашего запроса SQL в промежуточный каталог S3.

Другие вопросы по тегам