Как установить неизменяемый коллекционный сериализатор Kryo в коде Spark

Я использую сериализацию Kryo в Spark (v1.6.1) в Java и при сериализации класса, который имеет коллекцию в своем поле, он выдает следующую ошибку:

Caused by: java.lang.UnsupportedOperationException
         at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:102)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
         at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
         ... 27 more

Я обнаружил, что это потому, что по умолчанию CollectionSerializer Kryo не может десериализовать коллекцию, потому что она не модифицируема, и вместо этого мы должны использовать UnmodifiableCollectionsSerializer.

Как конкретно упомянуть в коде спарк для использования UnmodifiableCollectionsSerializer для Kryo?

Моя текущая конфигурация -

SparkConf conf = new SparkConf().setAppName("ABC");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class<?>[] {*list of classes I want to register*});

1 ответ

Решение

В случае, если кто-то еще столкнется с этой проблемой, вот решение - я получил его с помощью сериализаторов javakaffee kryo.

Добавьте следующую зависимость maven:

<dependency>
        <groupId>de.javakaffee</groupId>
        <artifactId>kryo-serializers</artifactId>
        <version>0.42</version>
</dependency>

Написать собственный регистратор крио для регистрации UnmodifiableCollectionsSerializer

    public class CustomKryoRegistrator implements KryoRegistrator {
        @Override
        public void registerClasses(Kryo kryo) {        
             UnmodifiableCollectionsSerializer.registerSerializers(kryo);
        }
   }

Установите spark.kryo.registrator на полное имя пользовательского регистратора

conf.set("spark.kryo.registrator", "com.abc.CustomKryoRegistrator");

Рекомендации -

https://github.com/magro/kryo-serializers

Spark Kryo: зарегистрировать пользовательский сериализатор

Другие вопросы по тегам