Не удалось инвертировать исключение биекции в Твиттере в приложении потоковой передачи Kafka с использованием avro:Java

Я следовал руководству по созданию и потреблению сообщений от kafka с использованием потоковой передачи искры. Идея состоит в том, чтобы создать простое сообщение, которое сериализуется в формате avro. Десериализовать сообщения из формата avro и использовать их при помощи потокового воспроизведения. Я не могу использовать сообщение, поскольку API-интерфейс bijection создает исключение Failed to Invert.

Режиссер:

public static final String schema = "{"
+"\"fields\": ["
+   " { \"name\": \"str1\", \"type\": \"string\" },"
+   " { \"name\": \"str2\", \"type\": \"string\" },"
+   " { \"name\": \"int1\", \"type\": \"int\" }"
+"],"
+"\"name\": \"myrecord\","
+"\"type\": \"record\""
+"}"; 

public static void startAvroProducer() throws InterruptedException{
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(AvroProducer.schema);

    Injection<GenericRecord, byte[]> inject = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String,byte[]> producer = new KafkaProducer<String,byte[]>(props);
    for(int i=0;i<1000;i++){
        GenericData.Record record = new GenericData.Record(schema);
        record.put("str1", "str1-"+i);
        record.put("str2", "str2-"+i);
        record.put("int1", i);

        byte[] bytes = inject.apply(record);

        ProducerRecord<String,byte[]> producerRec = new ProducerRecord<String, byte[]>("jason", bytes);
        producer.send(producerRec);
        Thread.sleep(250);

    }

    producer.close();
}

Потребитель:

 private static SparkConf sc = null;
        private static JavaSparkContext jsc = null;
        private static JavaStreamingContext jssc = null;
        private static Injection<GenericRecord,byte[]> inject = null;

        static {
            Schema.Parser parser = new Schema.Parser();
            Schema schema = parser.parse(AvroProducer.schema);
            inject = GenericAvroCodecs.apply(schema);
        }

        public static void startAvroConsumer() throws InterruptedException {
            sc = new SparkConf().setAppName("Spark Avro Streaming Consumer")
                    .setMaster("local[*]");
            jsc = new JavaSparkContext(sc);
            jssc = new JavaStreamingContext(jsc, new Duration(200));

            Set<String> topics = Collections.singleton("jason");
            Map<String, String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("metadata.broker.list", "localhost:9092");
            JavaPairInputDStream<String, byte[]> inputDstream = KafkaUtils
                    .createDirectStream(jssc, String.class, byte[].class,
                            StringDecoder.class, DefaultDecoder.class, kafkaParams,
                            topics);

            inputDstream.map(message -> inject.invert(message._2).get()).foreachRDD(rdd -> {
                    rdd.foreach(record -> {
                        System.out.println(record.get("str1"));
                        System.out.println(record.get("str2"));
                        System.out.println(record.get("int1"));
                    });
                });

            jssc.start();
            jssc.awaitTermination();
        }

Исключение:

com.twitter.bijection.InversionFailure: Failed to invert: [B@3679b3f6
    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43)
    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at scala.util.Failure.recoverWith(Try.scala:203)
    at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
    at com.twitter.bijection.avro.GenericAvroCodec.invert(AvroCodecs.scala:293)
    at com.twitter.bijection.avro.GenericAvroCodec.invert(AvroCodecs.scala:276)
    at com.applications.streaming.consumers.AvroConsumer.lambda$0(AvroConsumer.java:54)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Not a data file.
    at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:105)
    at org.apache.avro.file.DataFileStream.<init>(DataFileStream.java:84)
    at com.twitter.bijection.avro.GenericAvroCodec$$anonfun$invert$2.apply(AvroCodecs.scala:295)
    at com.twitter.bijection.avro.GenericAvroCodec$$anonfun$invert$2.apply(AvroCodecs.scala:293)
    at com.twitter.bijection.Inversion$$anonfun$attempt$1.apply(Inversion.scala:32)
    at scala.util.Try$.apply(Try.scala:192)
    ... 18 more

0 ответов

Другие вопросы по тегам