NiFi, поток с KafkaConsumer, чтобы написать как JSON
В настоящее время я застрял в следующей проблеме: я читаю сообщения из темы Kafka с помощью KafkaConsumer. Сообщения являются строками и имеют следующий формат:
{ "a" : "b", "a1" : "b1", "c2" : "c3" }
Они сохраняются в полезной нагрузке FlowFile.
Я хочу преобразовать эту строку в JSON или в идеале в CSV, но не могу понять, как это сделать.
Я новичок в NiFi и исследовал как можно больше, но ответы, которые я нашел, были относительно преобразований из JSON в AVRO или аналогичные, но никогда не строки в JSON или AVRO. Я также обнаружил, что сообщение Kafka находится в полезной нагрузке FlowFile, а не в атрибутах, поэтому я понятия не имею, как его достать, поскольку в примерах всегда используются атрибуты.
Вкратце: могу ли я преобразовать полезную нагрузку FlowFile, которая является строкой, в json / cvs с некоторым встроенным процессором.
2 ответа
Если ваше сообщение находится в FlowFile, может помочь следующая последовательность:
1) Используйте AttributesToJson для преобразования сообщения полезной нагрузки в Json. 2) Используйте EvaluateJsonPath для извлечения сообщения полезной нагрузки. В вашем случае сообщение кафка. Затем вы можете передать извлеченные сообщения для генерации CSV.
Этот пост может помочь конвертировать Json в CSV: конвертировать Json в CSV
Я закончил тем, что сделал это:
- ConsumeKafka дает мне строку:
{ "a" : "b", "a1" : "b1" }
- EvaluateJsonPath создает атрибуты, добавляя свойства
a -> $.a //results in attribute named a with value b
a1 -> $.a1 //results in attribute named a1 with value b1
- ReplaceText получает атрибуты из EvaluateJsonPath для формирования одного единственного csv-формата:
Replacement value -> ${'a'},${'a1'}
Это приводит к единственной строке, НО НЕТ НОВОЙ ЛИНИИ:
b,b1
Чтобы добавить новую строку с добавлением \ n, '\ n', "\ n" не работает.Что работало, так это нажатие клавиш Shift + Enter при вводе в поле значения замены, в результате чего была создана пустая новая строка.