Чтение сообщений Avro от Kafka с использованием структурированной потоковой передачи в Spark 2.1
Я следил за сообщением @Ralph Gonzalez в этой теме, читая сообщения Avro от Kafka с использованием структурированного потокового вещания в Spark 2.1, но получаю следующую ошибку.
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)
at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Я наткнулся на пост @Michael G. Noll, в котором предлагается использовать DataFileReader вместо binaryDecoder, как показано ниже.
DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema);
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(inputStream, datumReader);
Я пытался использовать это в Scala, но безуспешно. Ниже приведено текущее состояние кода.
def main (args: Array [String]) {
val KafkaBroker = "**.**.**.**:9092";
val InTopic = "avro";
// Get Spark session
val session = SparkSession
.builder
.master("local[*]")
.appName("myapp")
.getOrCreate()
// Load streaming data
import session.implicits._
//val msg=data.selectExpr("CAST(value AS Array[Byte])")
//val rec = reader.read(null, decoder.binaryDecoder(msg, null))
//val disp=msg.writeStream.outputMode("append").format("console").start()
val data = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaBroker)
.option("subscribe", InTopic)
.load()
.select($"value".as[Array[Byte]])
.map(d => {
val rec = reader.read(null, decoder.binaryDecoder(d, null))
val payload = rec.get("payload").asInstanceOf[Byte].toString
new KafkaMessage(payload)
})
val query = data.writeStream
.outputMode("Append")
.format("console")
.start()
query.awaitTermination()
}
Моя схема и класс case выглядят так, как показано ниже
case class KafkaMessage(
payload: String )
val schemaString = """{
"type" : "record",
"name" : "HdfsEvent",
"namespace" : "com.expedia.txb.domain.hdfs",
"fields" : [ {
"name" : "payload",
"type" : {
"type" : "bytes",
"java-class" : "[B"
}
} ]
}"""
Я буквально потратил последние 2 дня на это, и поэтому любая помощь будет высоко оценена. Благодарю.