Не удалось инвертировать исключение биекции в Твиттере в приложении потоковой передачи Kafka с использованием avro:Java
Я следовал руководству по созданию и потреблению сообщений от kafka с использованием потоковой передачи искры. Идея состоит в том, чтобы создать простое сообщение, которое сериализуется в формате avro. Десериализовать сообщения из формата avro и использовать их при помощи потокового воспроизведения. Я не могу использовать сообщение, поскольку API-интерфейс bijection создает исключение Failed to Invert.
Режиссер:
public static final String schema = "{"
+"\"fields\": ["
+ " { \"name\": \"str1\", \"type\": \"string\" },"
+ " { \"name\": \"str2\", \"type\": \"string\" },"
+ " { \"name\": \"int1\", \"type\": \"int\" }"
+"],"
+"\"name\": \"myrecord\","
+"\"type\": \"record\""
+"}";
public static void startAvroProducer() throws InterruptedException{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(AvroProducer.schema);
Injection<GenericRecord, byte[]> inject = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String,byte[]> producer = new KafkaProducer<String,byte[]>(props);
for(int i=0;i<1000;i++){
GenericData.Record record = new GenericData.Record(schema);
record.put("str1", "str1-"+i);
record.put("str2", "str2-"+i);
record.put("int1", i);
byte[] bytes = inject.apply(record);
ProducerRecord<String,byte[]> producerRec = new ProducerRecord<String, byte[]>("jason", bytes);
producer.send(producerRec);
Thread.sleep(250);
}
producer.close();
}
Потребитель:
private static SparkConf sc = null;
private static JavaSparkContext jsc = null;
private static JavaStreamingContext jssc = null;
private static Injection<GenericRecord,byte[]> inject = null;
static {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(AvroProducer.schema);
inject = GenericAvroCodecs.apply(schema);
}
public static void startAvroConsumer() throws InterruptedException {
sc = new SparkConf().setAppName("Spark Avro Streaming Consumer")
.setMaster("local[*]");
jsc = new JavaSparkContext(sc);
jssc = new JavaStreamingContext(jsc, new Duration(200));
Set<String> topics = Collections.singleton("jason");
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
JavaPairInputDStream<String, byte[]> inputDstream = KafkaUtils
.createDirectStream(jssc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, kafkaParams,
topics);
inputDstream.map(message -> inject.invert(message._2).get()).foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println(record.get("str1"));
System.out.println(record.get("str2"));
System.out.println(record.get("int1"));
});
});
jssc.start();
jssc.awaitTermination();
}
Исключение:
com.twitter.bijection.InversionFailure: Failed to invert: [B@3679b3f6
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at scala.util.Failure.recoverWith(Try.scala:203)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
at com.twitter.bijection.avro.GenericAvroCodec.invert(AvroCodecs.scala:293)
at com.twitter.bijection.avro.GenericAvroCodec.invert(AvroCodecs.scala:276)
at com.applications.streaming.consumers.AvroConsumer.lambda$0(AvroConsumer.java:54)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Not a data file.
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:105)
at org.apache.avro.file.DataFileStream.<init>(DataFileStream.java:84)
at com.twitter.bijection.avro.GenericAvroCodec$$anonfun$invert$2.apply(AvroCodecs.scala:295)
at com.twitter.bijection.avro.GenericAvroCodec$$anonfun$invert$2.apply(AvroCodecs.scala:293)
at com.twitter.bijection.Inversion$$anonfun$attempt$1.apply(Inversion.scala:32)
at scala.util.Try$.apply(Try.scala:192)
... 18 more