Spark - JavaPairRDD saveAsHadoopFile в AvroOutputFormat

Я пытаюсь спасти JavaPairRDD в файл avro со следующим кодом

JavaPairRDD<String, Float> j = existingRDD.mapToPair().combineByKey().mapToPair();

j.saveAsHadoopFile("/hdfsPath/avro/", String.class, Float.class, AvroOutputFormat.class);

Но я получаю NullPointerExceptionна второй линии

java.lang.NullPointerException
at java.io.StringReader.<init>(StringReader.java:50)
at org.apache.avro.Schema$Parser.parse(Schema.java:1012)
at org.apache.avro.Schema.parse(Schema.java:1064)
at org.apache.avro.mapred.AvroJob.getOutputSchema(AvroJob.java:143)
at org.apache.avro.mapred.AvroOutputFormat.getRecordWriter(AvroOutputFormat.java:153)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1191)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1183)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Это, вероятно, из-за того, что я не использую saveAsHadoopFile правильно, потому что я не получаю никакой ошибки при использовании

j.saveAsTextFile("/hdfsPath/avro/");
//OR
j.saveAsHadoopFile("/user/cloudera/avro/", String.class, Float.class, TextOutputFormat.class);

Функция Pair передана mapToPair возвращает Tuple2<String, Float>, Кроме того, вместо AvroOutputFormat.class в saveAsHadoopFile метод, я попытался создать свой собственный класс и расширить его AvroOutputFormat,

public class CombineOutput extends AvroOutputFormat{
  String department;
  Float avgSal;
}

который был принят как

j.saveAsHadoopFile("/hdfsPath/avro/", String.class, Float.class, CombineOutput.class);

Но это дало мне то же самое NullPointerException,

Я не мог найти какой-либо ресурс в сети относительно saveAsHadoopFile с AvroOutputFormat на Яве. Может ли кто-нибудь помочь мне с этим?

Я использую Spark 1.6.0

0 ответов

Другие вопросы по тегам