Spark - JavaPairRDD saveAsHadoopFile в AvroOutputFormat
Я пытаюсь спасти JavaPairRDD
в файл avro со следующим кодом
JavaPairRDD<String, Float> j = existingRDD.mapToPair().combineByKey().mapToPair();
j.saveAsHadoopFile("/hdfsPath/avro/", String.class, Float.class, AvroOutputFormat.class);
Но я получаю NullPointerException
на второй линии
java.lang.NullPointerException
at java.io.StringReader.<init>(StringReader.java:50)
at org.apache.avro.Schema$Parser.parse(Schema.java:1012)
at org.apache.avro.Schema.parse(Schema.java:1064)
at org.apache.avro.mapred.AvroJob.getOutputSchema(AvroJob.java:143)
at org.apache.avro.mapred.AvroOutputFormat.getRecordWriter(AvroOutputFormat.java:153)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1191)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1183)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Это, вероятно, из-за того, что я не использую saveAsHadoopFile
правильно, потому что я не получаю никакой ошибки при использовании
j.saveAsTextFile("/hdfsPath/avro/");
//OR
j.saveAsHadoopFile("/user/cloudera/avro/", String.class, Float.class, TextOutputFormat.class);
Функция Pair передана mapToPair
возвращает Tuple2<String, Float>
, Кроме того, вместо AvroOutputFormat.class
в saveAsHadoopFile
метод, я попытался создать свой собственный класс и расширить его AvroOutputFormat
,
public class CombineOutput extends AvroOutputFormat{
String department;
Float avgSal;
}
который был принят как
j.saveAsHadoopFile("/hdfsPath/avro/", String.class, Float.class, CombineOutput.class);
Но это дало мне то же самое NullPointerException
,
Я не мог найти какой-либо ресурс в сети относительно saveAsHadoopFile
с AvroOutputFormat
на Яве. Может ли кто-нибудь помочь мне с этим?
Я использую Spark 1.6.0