Неправильный тип времени выполнения в RDD при чтении из avro с помощью специального сериализатора

Я пытаюсь прочитать данные из файлов avro в RDD, используя Kryo. Мой код компилируется нормально, но во время выполнения я получаю ClassCastException, Вот что делает мой код:

SparkConf conf = new SparkConf()...
conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
conf.set("spark.kryo.registrator", MyKryoRegistrator.class.getName());
JavaSparkContext sc = new JavaSparkContext(conf);

куда MyKryoRegistrator регистрирует сериализатор для MyCustomClass:

public void registerClasses(Kryo kryo) {
    kryo.register(MyCustomClass.class, new MyCustomClassSerializer());
}

Затем я читаю мой файл данных:

JavaPairRDD<MyCustomClass, NullWritable> records =
                sc.newAPIHadoopFile("file:/path/to/datafile.avro",
                AvroKeyInputFormat.class, MyCustomClass.class, NullWritable.class,
                sc.hadoopConfiguration());
Tuple2<MyCustomClass, NullWritable> first = records.first();

Кажется, это работает нормально, но с помощью отладчика я вижу, что, хотя в RDD есть kClassTag my.package.conisting.MyCustomClass, переменная first содержит Tuple2<AvroKey, NullWritable>не Tuple2<MyCustomClass, NullWritable>! И действительно, когда выполняется следующая строка:

System.out.println("Got a result, custom field is: " + first._1.getSomeCustomField());

Я получаю исключение:

java.lang.ClassCastException: org.apache.avro.mapred.AvroKey cannot be cast to my.package.containing.MyCustomClass

Я делаю что-то неправильно? И даже если я не получу ошибку компиляции, а не ошибку времени выполнения?

1 ответ

*************РЕДАКТИРОВАТЬ**************

Мне удалось загрузить пользовательские объекты из файлов avro и создать GitHub-репозиторий с кодом. Однако если avro lib не удается загрузить данные в пользовательский класс, вместо этого он возвращает объекты GenericData$Record. И в этом случае Spark Java API не проверяет присвоение пользовательскому классу, поэтому вы получаете исключение ClassCastException только при попытке получить доступ к данным AvroKey. Это нарушение гарантии безопасности данных.

*************РЕДАКТИРОВАТЬ**************

Для всех, кто пытается это сделать, у меня есть способ обойти эту проблему, но это не может быть правильным решением: я создал класс для чтения GenericData.Record из файлов avro:

public class GenericRecordFileInputFormat extends FileInputFormat<GenericData.Record, NullWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(GenericRecordFileInputFormat.class);

    /**
     * {@inheritDoc}
     */
    @Override
    public RecordReader<GenericData.Record, NullWritable> createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        Schema readerSchema = AvroJob.getInputKeySchema(context.getConfiguration());
        if (null == readerSchema) {
            LOG.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
            LOG.info("Using a reader schema equal to the writer schema.");
        }
        return new GenericDataRecordReader(readerSchema);
    }


    public static class GenericDataRecordReader extends RecordReader<GenericData.Record, NullWritable> {

        AvroKeyRecordReader<GenericData.Record> avroReader;

        public GenericDataRecordReader(Schema readerSchema) {
            super();
            avroReader = new AvroKeyRecordReader<>(readerSchema);
        }

        @Override
        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            avroReader.initialize(inputSplit, taskAttemptContext);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return avroReader.nextKeyValue();
        }

        @Override
        public GenericData.Record getCurrentKey() throws IOException, InterruptedException {
            AvroKey<GenericData.Record> currentKey = avroReader.getCurrentKey();
            return currentKey.datum();
        }

        @Override
        public NullWritable getCurrentValue() throws IOException, InterruptedException {
            return avroReader.getCurrentValue();
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return avroReader.getProgress();
        }

        @Override
        public void close() throws IOException {
            avroReader.close();
        }
    }
}

Затем я загружаю записи:

JavaRDD<GenericData.Record> records = sc.newAPIHadoopFile("file:/path/to/datafile.avro",
                GenericRecordFileInputFormat.class, GenericData.Record.class, NullWritable.class,
                sc.hadoopConfiguration()).keys();

Затем я конвертирую записи в свой пользовательский класс, используя конструктор, который принимает GenericData.Record,

Опять же - не красиво, но работает.

Другие вопросы по тегам