Apache Spark с поведением Кассандры
Я пишу отдельную программу Spark, которая получает данные от Кассандры. Я последовал примерам и создал RDD с помощью newAPIHadoopRDD() и класса ColumnFamilyInputFormat. СДР создан, но я получаю NotSerializableException при вызове метода СДР.groupByKey():
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local").setAppName("Test");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
Job job = new Job();
Configuration jobConf = job.getConfiguration();
job.setInputFormatClass(ColumnFamilyInputFormat.class);
ConfigHelper.setInputInitialAddress(jobConf, host);
ConfigHelper.setInputRpcPort(jobConf, port);
ConfigHelper.setOutputInitialAddress(jobConf, host);
ConfigHelper.setOutputRpcPort(jobConf, port);
ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setFinish(new byte[0]);
sliceRange.setStart(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(jobConf, predicate);
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd =
spark.newAPIHadoopRDD(jobConf,
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class, SortedMap.class);
JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey();
System.out.println(groupRdd.count());
}
Исключение:
java.io.NotSerializableException: java.nio.HeapByteBuffer в java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164) в java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.Ov.Ov.Ov.Ov.Ov.Ov.Ovava.jpg) в java.io. ObjectOutputStream.java:1483) в java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) в java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) в java.io.StreamOutOutOutOut) в org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) в org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) в org.apache.spark.scheleapThuffer.hu $ anonfun $ runTask $ 1.apply (ShuffleMapTask.scala: 161) в org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) в scala.collection.achrator (class). Iterator.scala:727) в org.apache.spark.InterruptibleIterator.foreach(прерывание tibleIterator.scala:28) в org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) в org.apache.spark.scheduler.ShuffleMapTask.runTask.SharkSk..scheduler.Task.run(Task.scala:51) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) в java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) в java.lang.Thread.run(Thread.java:662)
Я пытаюсь объединить все столбцы ключей строк в одну запись. Я также получаю то же исключение, когда пытаюсь использовать метод reduByKey () следующим образом:
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() {
public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0,
SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator());
sortedMap.putAll(arg0);
sortedMap.putAll(arg1);
return sortedMap;
}
}
);
Я использую:
- искровым 1.0.0-бен-hadoop1
- Кассандра 1.2.12
- Java 1.6
Кто-нибудь знает в чем проблема? Что это там, что не удается сериализации?
Спасибо,
Шай
1 ответ
Ваша проблема вызвана, вероятно, попыткой сериализации ByteBuffers. Они не являются сериализуемыми, и вам нужно преобразовать их в массивы байтов перед созданием RDD.
Вам следует попробовать официальный драйвер DataStax Cassandra для Spark, который доступен здесь