Получено исключение при вызове метода readJson MorphlineInterceptor
Я просто хочу получить данные Json из Kafka, а затем выполнить некоторые преобразования с помощью Morphline. Я просто получаю это исключение:
ОШИБКА kafka.KafkaSource: KafkaSource EXCEPTION, {} org.apache.flume.FlumeException: > org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterc> eptor не должен создавать вложения, которые не являются байтами [] или InputStreamte[]
это моя конфигурация лотка:
# Name the components on this agent
a1.sources = kafka-source
a1.sinks = console
a1.channels = mem-channel
# Describe/configure the source
a1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafka-source.batchSize = 100
a1.sources.kafka-source.kafka.bootstrap.servers = 10.20.80.220:9092
a1.sources.kafka-source.kafka.topics = worm_video_info
a1.sources.kafka-source.kafka.consumer.group.id = flume-consumer-2
a1.sources.kafka-source.kafka.consumer.auto.offset.reset=earliest
a1.sources.kafka-source.interceptors = morphlineinterceptor
a1.sources.kafka-source.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.kafka-source.interceptors.morphlineinterceptor.morphlineFile = morphline-test.conf
a1.sources.kafka-source.interceptors.morphlineinterceptor.morphlineId = morphline-test-id
# Describe the sink
a1.sinks.console.type = logger
# Use a channel which buffers events in memory
a1.channels.mem-channel.type = memory
a1.channels.mem-channel.capacity = 1000
a1.channels.mem-channel.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.kafka-source.channels = mem-channel
a1.sinks.console.channel = mem-channel
это моя конфигурация морфлайна:
morphlines : [
{
id : morphline-test-id
importCommands : ["org.kitesdk.**"]
commands : [
{
readJson{
}
}
]
}
]
alomost просто прочитал Json. Здесь что-то не так? Пожалуйста, помогите, спасибо!
Я проверил запись с помощью этого кода
morphlines : [
{
id : morphline-test-id
importCommands : ["org.kitesdk.**"]
commands : [
{
java {
imports : "import java.nio.charset.StandardCharsets;"
code: """
logger.debug("++++++++++@11111++++++record: {}", new String((byte[])(record.get("_attachment_body").get(0)),StandardCharsets.UTF_8));
logger.debug("++++++++++@2222++++++ is byte[]? {}", record.get("_attachment_body").get(0) instanceof byte[] );
return child.process(record); // pass record to next command in chain
"""
}
}
{
# Parse input attachment and emit a record for each input line
readLine {
charset : UTF-8
}
}
]
}
]
[_attachment_body] на самом деле является байтом []