Сериализатор Kryo вызывает исключение для базового класса Scala WrappedArray

Два вопроса, ответ на общий, поможет мне понять, насколько я могу сделать MVCE.

1) Как я могу узнать, как зарегистрировать WrappedArray (и любой другой класс в Scala, который я мог бы использовать)? Нормально ли регистрировать классы из библиотек в Kryo?

и конкретные:

2) Как мне это исправить? (Готов признать, что у меня может быть что-то другое, если это отражает ложную ошибку, поэтому не убивайте себя, пытаясь воспроизвести это)

ПОДРОБНОСТИ

Тестирование программы Spark на Java с использованием наших пользовательских классов, связанных с генетикой и статистикой, в Spark 1.4.1, Scala 2.11.5 со следующими настройками в SparkConf:

// for kyro serializer it wants to register all classes that need to be serialized
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class};

SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData")
                <SNIP other settings to declare master>
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                //require registration of all classes with Kryo
                .set("spark.kryo.registrationRequired", "true")
                .registerKryoClasses(kryoClassArray);

Получение этой ошибки (повторяется в конце длинного списка ошибок):

Caused by: java.lang.IllegalArgumentException: Class is not
registered: scala.collection.mutable.WrappedArray$ofRef Note: To
register this class use:
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);

Но я никогда не называю этот класс из своего кода. Я могу добавить scala.collection.mutable.WrappedArray к kryoClassArray, но это не решает проблему. Если я добавлю scala.collection.mutable.WrappedArray $ ofRef.class (как указано в сообщении об ошибке), это синтаксическая ошибка, то я вижу, что не могу объявить анонимную функцию здесь?

MVCE: Я запустил MVCE, но проблема в том, что для работы с нашими классами требуются внешние библиотеки и файлы текста / данных. Как только я раздеваю наши классы, у меня не возникает проблем. Если бы кто-то мог ответить на общий вопрос, это могло бы помочь мне понять, сколько из MVCE я могу придумать.

Когда я пишу этот вопрос, я получил разрешение на обновление до 1.5.2, посмотрю, есть ли там какие-либо изменения и обновлю вопрос, если так.

Если не считать MVCE, вот мои объявления класса:

public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable {

class PrintHetSharing implements VoidFunction<DropResult> {

class SparkDoDrop implements Function<Integer, Integer> {

Полные ошибки:

16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@155.100.214.138:55646/user/Executor#214759698]) with ID 0
16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
    at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);

2 ответа

Решение

В Scala вы должны исправить эту проблему, добавив scala.collection.mutable.WrappedArray.ofRef[_] в качестве зарегистрированного класса, как в следующем фрагменте:

conf.registerKryoClasses(
  Array(
    ...
    classOf[Person],
    classOf[Array[Person]],
    ...
    classOf[scala.collection.mutable.WrappedArray.ofRef[_]]
  )
)

Вам не нужно делать все сериализуемым, независимо от того, является ли оно частью клиентской библиотеки или нет. Но вам НЕОБХОДИМО сделать любую лямбду, которая будет действовать на исполнителей, сериализуемой. Они не запускаются на главном узле, поэтому нет способа предотвратить сериализацию (и вы не хотите этого делать, поскольку основная цель Spark - распределенные вычисления).

Для примеров и тому подобного (и если вы еще не совсем поняли концепцию), проверьте официальные документы об этом.

Другие вопросы по тегам