Скрипт Python с использованием ExecuteStreamCommand

Сделав все возможное, чтобы найти предыдущие вопросы и примеры, относящиеся к этому вопросу, и все еще не находя ответов, которые я ищу, я решил, что я сам задам вопрос.

ExecuteStreamCommand кажется мне идеальным процессором по следующим причинам:

  • Я могу выполнить любой скрипт Python и избежать Jython (аналогично ExecuteScript). Jython не вариант для меня.
  • Я могу взять в FlowFiles. Это необходимо, так как мой скрипт создан для использования выходных данных предыдущего процессора. Кроме того, мне нравится идея хранения данных в разделе "Управление NiFi".
  • Он записывает "статус выполнения", который будет полезен для маршрутизации.

В двух словах, что я пытаюсь сделать с ExecuteStreamCommand:

  • Загрузка результатов предыдущего процессора (паук Scrapy, который выводит текстовый файл с точками JSON)
  • Вызовите скрипт на Python (например, python3 my_script.py)
  • Загрузите FlowFile, который был загружен в мой скрипт на python.
  • Выберите содержимое FlowFile.
  • Работайте с содержимым FlowFile в Python.
  • Выведите либо обновленную версию исходного FlowFile, либо создайте новую.
  • Продолжите мой поток NiFi с обновленным / новым FlowFile.

Для ясности я сейчас не понимаю:

  • Как вызвать скрипт Python (из процессора ExecuteStreamCommand)
  • Как загрузить FlowFile из Python
  • Как обновить или создать новый FlowFile из Python
  • Как вывести обновленный FlowFile из Python обратно в NiFi.

Я сталкивался с различными примерами для ExecuteScript, но, к сожалению, они не совсем соответствуют использованию ExecuteStreamCommand.

Заранее спасибо. Любой совет приветствуется.

1 ответ

Решение

От вашего вопроса вы говорите, что вам нужно вызвать скрипт Python без использования InvokeScriptedProcessor или же ExecuteScript процессоры, потому что вы не можете использовать Jython. Учитывая это требование, вы все равно сможете достичь своей цели. Хотя это требует некоторого знакомства со структурой, вся эта информация взята из ExecuteStreamCommand документация

Ваш раздел "Я сейчас не понимаю":

  • Как вызвать скрипт Python (из процессора ExecuteStreamCommand)

    • В вашем ExecuteStreamCommand процессор, настройте свойства аргументов команды и путь к команде следующим образом:

      • Командные аргументы: any flags or args, delimited by ; (т.е. /path/to/my_script.py)
      • Путь к команде: /path/to/python3
  • Как загрузить FlowFile из Python

    • Содержимое потокового файла будет передаваться через STDIN, поэтому в вашем скрипте Python эти данные обрабатываются так же, как при обычной обработке STDIN.
  • Как обновить или создать новый FlowFile из Python
    • NiFi управляет созданием потокового файла в рамках. Любые данные, переданные вашим скриптом Python в STDOUT, будут заполнены содержимым результирующего потокового файла, переданного в отношение выходного потока ExecuteStreamCommand процессор. В этом случае вашему сценарию не нужно иметь представление о "потоковых файлах". Если бы вы вместо этого использовали ISP или же ES процессоры, вы можете использовать API сценариев NiFi, который автоматически внедряется в сценарии для создания или обновления объекта flowfile.
  • Как вывести обновленный FlowFile из Python обратно в NiFi.
    • Опять же, просто запишите желаемое содержимое потокового файла в STDOUT из вашего скрипта и (учитывая код состояния возврата 0) NiFi создаст новый потоковый файл с этим контентом. Если вы установите свойство Выходной атрибут назначения в ESC к ненулевому значению, NiFi вместо этого обновит существующий потоковый файл новым атрибутом с тем же именем, содержащим выходные данные скрипта.
Другие вопросы по тегам