Apache NiFi: FlowFileHandlingException при передаче FlowFile в пользовательский процессор

Я выполняю прием данных из удаленного сервиса API для разных временных интервалов в своем собственном процессоре NiFi.

У меня есть глобальный счетчик временных диапазонов, который обновляется с каждой итерацией (я использую стратегию планирования по таймеру).

Когда счетчик больше максимального значения, я хочу передать только FlowFile из запроса (session.get()) с отношением УСПЕХ, т.е. без выполнения дополнительной логики:

session.transfer(requestFlowFile, SUCCESS);

Я понимаю, что не могу остановить или приостановить процессор, когда закончился сбор временных диапазонов. Поэтому я пытаюсь использовать вышеуказанный подход в качестве решения.

Все итерации идут нормально, пока счетчик не станет больше максимума и процессор попытается передать FlowFile из запроса (session.get())

Итак, у меня есть это исключение:

не удалось обработать сеанс из-за org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=459e615b-0ff5-424f-aac7-f95d364cdc13, заявка =, смещение =0, имя =99628180019265, известная как размер =0] не известна на этой сессии

Что здесь не так? Или может быть другой подход?

2 ответа

Эта ошибка означает, что файл потока, передаваемый в session.transfer(), пришел из другого сеанса. Вы можете вызывать Transfer () только в том же сеансе, из которого вы вызвали get().

Если это кастомный процессор - просто не делай session.get() и пропустить это выполнение, ничего не передавая.

или если вам нужен входящий файл, чтобы принять решение, вы можете его получить, сделать некоторые проверки и откатить текущую сессию с штрафом rollback(true)Таким образом, полученные файлы останутся во входящей очереди во время Penalty Duration параметр процессора без запуска.

или вы можете сделать session.get( FlowFileFilter) получать из входящей очереди только файлы, соответствующие вашей логике

Другие вопросы по тегам