Исключение сериализации на искре

У меня на Spark очень странная проблема с сериализацией. Код как ниже:

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
    def infer(document: RDD[Document]): RDD[DocumentParameter] = {
      val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
      docs
    }
}

где Документ определяется как:

class Document(val tokens: SparseVector[Int]) extends Serializable

и DocumentParameter это:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable

object DocumentParameter extends Serializable
{
  def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document, 
    Array.ofDim[Float](numOfTopics))
}

SparseVector - это сериализуемый класс в breeze.linalg.SparseVector,

Это простая процедура сопоставления, и все классы сериализуемы, но я получаю следующее исключение:

org.apache.spark.SparkException: Task not serializable

Но когда я удаляю numOfTopics параметр, то есть:

object DocumentParameter extends Serializable
{
  def apply(document: Document) = new DocumentParameter(document, 
    Array.ofDim[Float](10))
}

и назовите это так:

val docs = documents.map(DocumentParameter.apply)

и вроде нормально.

Тип Int не сериализуем? Но я вижу, что некоторый код написан так.

Я не уверен, как исправить эту ошибку.

# ОБНОВЛЕНО #:

Спасибо @samthebest. Я добавлю больше деталей об этом.

stack trace:
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.map(RDD.scala:270)
    at com.topicmodel.PLSA.infer(PLSA.scala:13)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC.<init>(<console>:41)
    at $iwC.<init>(<console>:43)
    at <init>(<console>:45)
    at .<init>(<console>:49)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 46 more

Поскольку трассировка стека дает общую информацию об исключении, я ее удалил.

Я запускаю код в спарк-оболочке.

// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)

Не могли бы вы дать мне несколько уроков или советов по сериализации?

2 ответа

Решение

Анонимные функции сериализуют свой содержащий класс. Когда ты map {doc => DocumentParameter(doc, numOfTopics)}, единственный способ, которым это может дать этой функции доступ к numOfTopics это сериализовать PLSA учебный класс. И этот класс на самом деле не может быть сериализован, потому что (как вы можете видеть из трассировки стека) он содержит SparkContext который не сериализуем (плохие вещи могут произойти, если отдельные узлы кластера имеют доступ к контексту и могут, например, создавать новые задания из картографа).

В общем, старайтесь не хранить SparkContext в ваших классах (отредактируйте: или, по крайней мере, убедитесь, что очень ясно, какие классы содержат SparkContext а какого рода нет); лучше передать как (возможно implicit) параметр для отдельных методов, которые нуждаются в этом. Или переместите функцию {doc => DocumentParameter(doc, numOfTopics)} в другой класс от PLSAтот, который действительно может быть сериализован.

(Как предположили несколько человек, можно сохранить SparkContext в классе, но помечены как @transient так что это не будет сериализовано. Я не рекомендую этот подход; это означает, что класс "волшебным образом" изменит состояние при сериализации (потеря SparkContext), и поэтому вы можете получить NPE при попытке доступа к SparkContext изнутри сериализованной работы. Лучше поддерживать четкое различие между классами, которые используются только в "контрольном" коде (и могут использовать SparkContext) и классы, которые сериализуются для запуска в кластере (которые не должны иметь SparkContext)).

Это действительно странный вопрос, но я думаю, что могу догадаться о проблеме. Но, во-первых, вы не предоставили минимум для решения проблемы (я могу догадаться, потому что я видел сотни из них раньше). Вот некоторые проблемы с вашим вопросом:

def infer(document: RDD[Document], numOfTopics: Int): RDD[DocumentParameter] = {
  val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
}

Этот метод не возвращает RDD[DocumentParameter] это возвращается Unit, Вы, должно быть, скопировали и вставили код неправильно.

Во-вторых, вы не предоставили всю трассировку стека? Зачем? Нет никакой причины НЕ предоставлять полную трассировку стека, а полная трассировка стека с сообщением необходима для понимания ошибки - нужно понять всю ошибку, чтобы понять, что это за ошибка. Обычно не сериализуемое исключение говорит вам, что не сериализуемо.

В-третьих, вы не сказали нам, где метод infer Вы делаете это в оболочке? Что такое содержащий объект / класс / признак и т. Д. infer?

Во всяком случае, я собираюсь догадаться, что, передав в Int вы заставили сериализовать серию вещей, которую вы не ожидаете, я не могу дать вам больше информации, пока вы не предоставите минимальный код, чтобы мы могли полностью понять вашу проблему.

Другие вопросы по тегам