Получено исключение при вызове метода readJson MorphlineInterceptor

Я просто хочу получить данные Json из Kafka, а затем выполнить некоторые преобразования с помощью Morphline. Я просто получаю это исключение:

ОШИБКА kafka.KafkaSource: KafkaSource EXCEPTION, {} org.apache.flume.FlumeException: > org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterc> eptor не должен создавать вложения, которые не являются байтами [] или InputStreamte[]

это моя конфигурация лотка:


# Name the components on this agent
a1.sources = kafka-source
a1.sinks = console
a1.channels = mem-channel

# Describe/configure the source
a1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafka-source.batchSize = 100
a1.sources.kafka-source.kafka.bootstrap.servers = 10.20.80.220:9092
a1.sources.kafka-source.kafka.topics = worm_video_info
a1.sources.kafka-source.kafka.consumer.group.id = flume-consumer-2
a1.sources.kafka-source.kafka.consumer.auto.offset.reset=earliest
a1.sources.kafka-source.interceptors = morphlineinterceptor
a1.sources.kafka-source.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.kafka-source.interceptors.morphlineinterceptor.morphlineFile = morphline-test.conf
a1.sources.kafka-source.interceptors.morphlineinterceptor.morphlineId = morphline-test-id


# Describe the sink
a1.sinks.console.type = logger


# Use a channel which buffers events in memory
a1.channels.mem-channel.type = memory
a1.channels.mem-channel.capacity = 1000
a1.channels.mem-channel.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.kafka-source.channels = mem-channel
a1.sinks.console.channel = mem-channel

это моя конфигурация морфлайна:

morphlines : [
    {
        id : morphline-test-id
        importCommands : ["org.kitesdk.**"]

        commands : [
            {
                readJson{
                }
            }

        ]
    }
]

alomost просто прочитал Json. Здесь что-то не так? Пожалуйста, помогите, спасибо!

Я проверил запись с помощью этого кода

morphlines : [
    {
        id : morphline-test-id
        importCommands : ["org.kitesdk.**"]

        commands : [
            {
                java {
          imports : "import java.nio.charset.StandardCharsets;"
                    code: """
                    logger.debug("++++++++++@11111++++++record: {}", new String((byte[])(record.get("_attachment_body").get(0)),StandardCharsets.UTF_8));
          logger.debug("++++++++++@2222++++++ is byte[]? {}", record.get("_attachment_body").get(0) instanceof byte[] );                                            
                    return child.process(record); // pass record to next command in chain
                            """

                }
            }
            {
                # Parse input attachment and emit a record for each input line              
                readLine {
                    charset : UTF-8
                }
            }

        ]
    }
]

[_attachment_body] на самом деле является байтом []

0 ответов

Другие вопросы по тегам