Триггер FetchFolder в NiFi?
Я использую NiFi для организации обработки больших двоичных файлов, используя собственный инструмент обработки (который работает вне NiFi).
NiFi сбрасывает исходные файлы на диск, я называю внешний инструмент (использующий процессор ExecuteScript), инструмент загружает двоичный файл и продолжает генерировать множество файлов меньшего размера.
Когда внешний инструмент полностью завершен, мне нужно "подобрать" каталог более мелких (сгенерированных) файлов и продолжить обработку через NiFi. Мне нужно подождать, потому что [каталог вывода], [количество файлов] и [время, необходимое для обработки] являются динамическими.
Эта проблема:
- GetFile (для захвата каталога) не имеет восходящего соединения, поэтому я не могу запустить его после завершения обработки.
- Комбинация ListFile + FetchFile не работает, потому что ListFile не имеет восходящего соединения, поэтому - опять же - я не могу вызвать его после завершения обработки.
... так какой процессор (ы) я могу использовать, чтобы после завершения двоичной обработки получить каталог новых файлов и перенести их на землю NiFi?
2 ответа
Отчасти в соответствии с ответом @Bryan Bende, я использовал ExecuteScript
процессор для создания процессора ListFile, который предлагает восходящее соединение:
import java.nio.charset.StandardCharsets
import groovy.io.FileType
def flowFile = session.get()
if(!flowFile) return
def flowFiles = [] as List<FlowFile>
def fetchDirectory = flowFile.getAttribute('fetchDirectory')
def listOfFiles = []
def dir = new File(fetchDirectory)
if(dir.exists()) {
dir.eachFileRecurse (FileType.FILES) { file ->
listOfFiles << file
}
}
listOfFiles.each { i ->
def newFlowFile = session.create()
session.putAttribute(newFlowFile, 'path', i.path)
session.putAttribute(newFlowFile, 'filename', i.getName())
flowFiles << newFlowFile
}
session.remove(flowFile)
session.transfer(flowFiles, REL_SUCCESS)
Итак, когда внешний инструмент завершает работу, я перенаправляю FlowFile блока на вышеупомянутый процессор, который затем перенаправляем в FetchFile
процессор.
Я собираюсь предположить, что у вашего внешнего инструмента есть способ уведомить NiFi, когда это будет сделано, поскольку вам это понадобится, даже если GetFile или ListFile поддерживают входящие файлы потока.
Так как насчет двухэтапного процесса...
Внешний инструмент записывает данные в каталог-1, а по завершении выполняет вызов REST API, предоставляемого процессором HandleHttpRequest, который затем переходит к процессору ExecuteScript, который вызывает "каталог-1 каталога-1 mv".
Процессор ListFile всегда наблюдает за каталогом-2, но никогда ничего не видит, пока не выполнится команда перемещения выше.