Задача не сериализуема: java.io.NotSerializableException при вызове функции вне замыкания только для классов, а не объектов
Странное поведение при вызове функции вне замыкания:
- когда функция находится в объекте, все работает
- когда функция находится в классе get:
Задача не сериализуема: java.io.NotSerializableException: тестирование
Проблема в том, что мне нужен мой код в классе, а не объект. Есть идеи, почему это происходит? Сериализуется ли объект Scala (по умолчанию?)?
Это пример рабочего кода:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Это нерабочий пример:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
11 ответов
Я не думаю, что другой ответ является полностью правильным. СДР действительно сериализуемы, так что это не то, что приводит к сбою вашей задачи.
Spark - это механизм распределенных вычислений, и его основной абстракцией является эластичный распределенный набор данных (RDD), который можно рассматривать как распределенную коллекцию. По сути, элементы RDD распределены по узлам кластера, но Spark абстрагирует их от пользователя, позволяя пользователю взаимодействовать с RDD (коллекцией), как если бы он был локальным.
Не вдаваться в подробности, но когда вы выполняете различные преобразования на СДР (map
, flatMap
, filter
и другие), ваш код преобразования (замыкание):
- сериализовано на узле драйвера,
- отправлены на соответствующие узлы в кластере,
- десериализации,
- и, наконец, выполняется на узлах
Конечно, вы можете запустить это локально (как в вашем примере), но все эти фазы (кроме доставки по сети) все же происходят. [Это позволяет выявлять любые ошибки еще до развертывания в производство]
Во втором случае происходит то, что вы вызываете метод, определенный в классе testing
изнутри функции карты. Spark видит это, и поскольку методы не могут быть сериализованы сами по себе, Spark пытается сериализовать все testing
класс, так что код будет по-прежнему работать при выполнении в другой JVM. У вас есть две возможности:
Либо вы делаете тестирование классов сериализуемым, так что весь класс может быть сериализован Spark:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
или вы делаете someFunc
функция вместо метода (функции - это объекты в Scala), так что Spark сможет ее сериализовать:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
Подобная, но не та же проблема с сериализацией классов может быть вам интересна, и вы можете прочитать об этом в этой презентации Spark Summit 2013.
Как примечание стороны, вы можете переписать rddList.map(someFunc(_))
в rddList.map(someFunc)
они точно такие же. Обычно второй вариант предпочтительнее, так как он менее подробный и понятный для чтения.
РЕДАКТИРОВАТЬ (2015-03-15): SPARK-5307 представил SerializationDebugger и Spark 1.3.0 является первой версией, которая его использует. Он добавляет путь сериализации в NotSerializableException. Когда встречается NotSerializableException, отладчик посещает граф объектов, чтобы найти путь к объекту, который не может быть сериализован, и создает информацию, чтобы помочь пользователю найти объект.
В случае с OP это то, что выводится на стандартный вывод:
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)
Великий ответ Греги объясняет, почему оригинальный код не работает, и два способа решения проблемы. Однако это решение не очень гибкое; рассмотрим случай, когда ваше закрытие включает вызов методаSerializable
класс, который вы не можете контролировать. Вы также не можете добавить Serializable
пометить этот класс или изменить базовую реализацию, чтобы изменить метод в функцию.
Nilesh предлагает для этого отличный обходной путь, но решение может быть сделано как более кратким, так и общим:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
Эта функция-сериализатор может затем использоваться для автоматической упаковки замыканий и вызовов методов:
rdd map genMapper(someFunc)
Эта техника также имеет то преимущество, что не требует дополнительных зависимостей Shark для доступа KryoSerializationWrapper
Так как Chill в Twitter уже запущен ядром Spark
Полный доклад, полностью объясняющий проблему, который предлагает отличный способ смены парадигмы, чтобы избежать этих проблем сериализации: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
Ответ, получивший наибольшее количество голосов, в основном предлагает отбросить целую языковую функцию, которая больше не использует методы и только функции. Действительно, в функциональном программировании следует избегать методов в классах, но превращение их в функции не решает проблему проектирования здесь (см. Ссылку выше).
В качестве быстрого решения в этой конкретной ситуации вы можете просто использовать @transient
аннотация, чтобы сказать это не пытаться сериализовать оскорбительное значение (здесь, Spark.ctx
пользовательский класс, а не имя, которое следует из Spark'а:
@transient
val rddList = Spark.ctx.parallelize(list)
Вы также можете реструктурировать код так, чтобы rddList жил где-то еще, но это также неприятно.
Будущее, вероятно, споры
В будущем Scala будет включать в себя такие вещи, как "споры", которые должны позволить нам точно контролировать зерно, что точно и не затягивается закрытием. Кроме того, это должно превратить все ошибки случайного извлечения несериализуемых типов (или любых нежелательных значений) в ошибки компиляции, а не сейчас, что является ужасными исключениями времени выполнения / утечками памяти.
http://docs.scala-lang.org/sips/pending/spores.html
Совет по сериализации Kryo
При использовании kyro сделайте так, чтобы регистрация была необходима, это будет означать, что вы получите ошибки вместо утечек памяти:
"Наконец, я знаю, что у kryo есть kryo.setRegistrationOptional (true), но мне очень трудно пытаться выяснить, как его использовать. Когда эта опция включена, кажется, что kryo по-прежнему генерирует исключения, если я не зарегистрировался классы ".
Стратегия регистрации классов с крио
Конечно, это только дает вам контроль уровня типа, а не контроль уровня значения.
... больше идей
Я столкнулся с подобной проблемой, и что я понимаю из ответа Греги:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
Ваш метод doIT пытается сериализовать метод someFunc(_), но так как метод не сериализуем, он пытается сериализовать тестирование классов, которое снова не сериализуемо.
Поэтому, чтобы ваш код работал, вы должны определить someFunc внутри метода doIT. Например:
def doIT = {
def someFunc(a:Int) = a+1
//function definition
}
val after = rddList.map(someFunc(_))
after.collect().map(println(_))
}
И если в картину входит несколько функций, то все эти функции должны быть доступны родительскому контексту.
Я решил эту проблему, используя другой подход. Вам просто нужно сериализовать объекты перед прохождением через замыкание, а затем десериализовать. Этот подход просто работает, даже если ваши классы не Serializable, потому что он использует Kryo за кулисами. Все, что вам нужно, это немного карри.;)
Вот пример того, как я это сделал:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
Не стесняйтесь делать Blah настолько сложным, насколько вам нужно, класс, объект-компаньон, вложенные классы, ссылки на несколько сторонних библиотек.
KryoSerializationWrapper ссылается на: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
Я не совсем уверен, что это относится к Scala, но в Java я решил NotSerializableException
путем рефакторинга моего кода, чтобы закрытие не получило доступ к несериализуемому final
поле.
Методы Scala, определенные в классе, не являются сериализуемыми, методы могут быть преобразованы в функции для решения проблемы сериализации.
Синтаксис метода
def func_name (x String) : String = {
...
return x
}
синтаксис функции
val func_name = { (x String) =>
...
x
}
К вашему сведению, в Spark 2.4 многие из вас, вероятно, столкнутся с этой проблемой. Сериализация Kryo стала лучше, но во многих случаях вы не можете использовать spark.kryo.unsafe=true или наивный сериализатор kryo.
Для быстрого исправления попробуйте изменить следующее в конфигурации Spark
spark.kryo.unsafe="false"
ИЛИ
spark.serializer="org.apache.spark.serializer.JavaSerializer"
Я изменяю настраиваемые преобразования RDD, с которыми сталкиваюсь или пишу лично, используя явные широковещательные переменные и новый встроенный api twitter-chill, конвертируя их из rdd.map(row =>
к rdd.mapPartitions(partition => {
функции.
пример
Старый (не очень) способ
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val outputRDD = rdd.map(row => {
val value = sampleMap.get(row._1)
value
})
Альтернативный (лучший) способ
import com.twitter.chill.MeatLocker
val sampleMap = Map("index1" -> 1234, "index2" -> 2345)
val brdSerSampleMap = spark.sparkContext.broadcast(MeatLocker(sampleMap))
rdd.mapPartitions(partition => {
val deSerSampleMap = brdSerSampleMap.value.get
partition.map(row => {
val value = sampleMap.get(row._1)
value
}).toIterator
})
Этот новый способ будет вызывать широковещательную переменную только один раз для каждого раздела, что лучше. Вам все равно придется использовать сериализацию Java, если вы не регистрируете классы.
У меня был подобный опыт.
Ошибка возникла, когда я инициализировал переменную в драйвере (главном), но затем попытался использовать ее на одном из рабочих. Когда это произойдет, Spark Streaming попытается сериализовать объект, чтобы отправить его работнику, и завершится ошибкой, если объект не сериализуемый.
Я решил ошибку, сделав переменную статической.
Предыдущий нерабочий код
private final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
Рабочий код
private static final PhoneNumberUtil phoneUtil = PhoneNumberUtil.getInstance();
Кредиты:
def upper(name: String) : String = {
var uppper : String = name.toUpperCase()
uppper
}
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
val emp_details = """[{"id": "1","name": "James Butt","country": "USA"},
{"id": "2", "name": "Josephine Darakjy","country": "USA"},
{"id": "3", "name": "Art Venere","country": "USA"},
{"id": "4", "name": "Lenna Paprocki","country": "USA"},
{"id": "5", "name": "Donette Foller","country": "USA"},
{"id": "6", "name": "Leota Dilliard","country": "USA"}]"""
val df_emp = spark.read.json(Seq(emp_details).toDS())
val df_name=df_emp.select($"id",$"name")
val df_upperName= df_name.withColumn("name",toUpperName($"name")).filter("id='5'")
display(df_upperName)
это приведет к ошибке org.apache.spark.SparkException: задача не сериализуема в org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
Решение -
import java.io.Serializable;
object obj_upper extends Serializable {
def upper(name: String) : String =
{
var uppper : String = name.toUpperCase()
uppper
}
val toUpperName = udf {(EmpName: String) => upper(EmpName)}
}
val df_upperName=
df_name.withColumn("name",obj_upper.toUpperName($"name")).filter("id='5'")
display(df_upperName)
Мое решение состояло в том, чтобы добавить класс compagnion, который обрабатывает все методы, которые нельзя сериализовать внутри класса.