Неправильный тип времени выполнения в RDD при чтении из avro с помощью специального сериализатора
Я пытаюсь прочитать данные из файлов avro в RDD, используя Kryo. Мой код компилируется нормально, но во время выполнения я получаю ClassCastException
, Вот что делает мой код:
SparkConf conf = new SparkConf()...
conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
conf.set("spark.kryo.registrator", MyKryoRegistrator.class.getName());
JavaSparkContext sc = new JavaSparkContext(conf);
куда MyKryoRegistrator
регистрирует сериализатор для MyCustomClass
:
public void registerClasses(Kryo kryo) {
kryo.register(MyCustomClass.class, new MyCustomClassSerializer());
}
Затем я читаю мой файл данных:
JavaPairRDD<MyCustomClass, NullWritable> records =
sc.newAPIHadoopFile("file:/path/to/datafile.avro",
AvroKeyInputFormat.class, MyCustomClass.class, NullWritable.class,
sc.hadoopConfiguration());
Tuple2<MyCustomClass, NullWritable> first = records.first();
Кажется, это работает нормально, но с помощью отладчика я вижу, что, хотя в RDD есть kClassTag my.package.conisting.MyCustomClass, переменная first
содержит Tuple2<AvroKey, NullWritable>
не Tuple2<MyCustomClass, NullWritable>
! И действительно, когда выполняется следующая строка:
System.out.println("Got a result, custom field is: " + first._1.getSomeCustomField());
Я получаю исключение:
java.lang.ClassCastException: org.apache.avro.mapred.AvroKey cannot be cast to my.package.containing.MyCustomClass
Я делаю что-то неправильно? И даже если я не получу ошибку компиляции, а не ошибку времени выполнения?
1 ответ
*************РЕДАКТИРОВАТЬ**************
Мне удалось загрузить пользовательские объекты из файлов avro и создать GitHub-репозиторий с кодом. Однако если avro lib не удается загрузить данные в пользовательский класс, вместо этого он возвращает объекты GenericData$Record. И в этом случае Spark Java API не проверяет присвоение пользовательскому классу, поэтому вы получаете исключение ClassCastException только при попытке получить доступ к данным AvroKey. Это нарушение гарантии безопасности данных.
*************РЕДАКТИРОВАТЬ**************
Для всех, кто пытается это сделать, у меня есть способ обойти эту проблему, но это не может быть правильным решением: я создал класс для чтения GenericData.Record
из файлов avro:
public class GenericRecordFileInputFormat extends FileInputFormat<GenericData.Record, NullWritable> {
private static final Logger LOG = LoggerFactory.getLogger(GenericRecordFileInputFormat.class);
/**
* {@inheritDoc}
*/
@Override
public RecordReader<GenericData.Record, NullWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration());
if (null == readerSchema) {
LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
LOG.info("Using a reader schema equal to the writer schema.");
}
return new GenericDataRecordReader(readerSchema);
}
public static class GenericDataRecordReader extends RecordReader<GenericData.Record, NullWritable> {
AvroKeyRecordReader<GenericData.Record> avroReader;
public GenericDataRecordReader(Schema readerSchema) {
super();
avroReader = new AvroKeyRecordReader<>(readerSchema);
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
avroReader.initialize(inputSplit, taskAttemptContext);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return avroReader.nextKeyValue();
}
@Override
public GenericData.Record getCurrentKey() throws IOException, InterruptedException {
AvroKey<GenericData.Record> currentKey = avroReader.getCurrentKey();
return currentKey.datum();
}
@Override
public NullWritable getCurrentValue() throws IOException, InterruptedException {
return avroReader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return avroReader.getProgress();
}
@Override
public void close() throws IOException {
avroReader.close();
}
}
}
Затем я загружаю записи:
JavaRDD<GenericData.Record> records = sc.newAPIHadoopFile("file:/path/to/datafile.avro",
GenericRecordFileInputFormat.class, GenericData.Record.class, NullWritable.class,
sc.hadoopConfiguration()).keys();
Затем я конвертирую записи в свой пользовательский класс, используя конструктор, который принимает GenericData.Record
,
Опять же - не красиво, но работает.