Spark java.lang.OutOfMemoryError: пространство кучи Java

Мой кластер: 1 ведущий, 11 ведомых, каждый узел имеет 6 ГБ памяти.

Мои настройки:

spark.executor.memory=4g, Dspark.akka.frameSize=512

Вот проблема:

Сначала я прочитал некоторые данные (2,19 ГБ) из HDFS в RDD:

val imageBundleRDD = sc.newAPIHadoopFile(...)

Во-вторых, сделайте что-нибудь на этом RDD:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })

Наконец, вывод в HDFS:

res.saveAsNewAPIHadoopFile(...)

Когда я запускаю свою программу, она показывает:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space

Есть слишком много задач?

PS: все нормально, когда входные данные составляют около 225 МБ.

Как я могу решить эту проблему?

14 ответов

У меня есть несколько предложений:

  • Если ваши узлы настроены на максимум 6 г для Spark (и оставляют немного для других процессов), тогда используйте 6 г, а не 4 г, spark.executor.memory=6g, Убедитесь, что вы используете как можно больше памяти, проверив пользовательский интерфейс (он скажет, сколько памяти вы используете)
  • Попробуйте использовать больше разделов, у вас должно быть 2 - 4 на процессор. IME, увеличивая количество разделов, часто является самым простым способом сделать программу более стабильной (и часто более быстрой). Для огромных объемов данных вам может потребоваться более 4 на процессор, в некоторых случаях мне приходилось использовать 8000 разделов!
  • Уменьшите долю памяти, зарезервированную для кэширования, используя spark.storage.memoryFraction, Если вы не используете cache() или же persist в вашем коде это также может быть 0. По умолчанию это 0,6, что означает, что вы получаете только 0,4 * 4 г памяти для своей кучи. Уменьшение IME часто приводит к тому, что OOM исчезают. ОБНОВЛЕНИЕ: Начиная с версии 1.6, очевидно, нам больше не нужно играть с этими значениями, их будет определять автоматически.
  • Похоже на вышеприведенное, но тасуется доля памяти. Если вашей работе не требуется много памяти в случайном порядке, установите для нее более низкое значение (это может привести к тому, что ваши перемешивания будут перетекать на диск, что может иметь катастрофические последствия для скорости). Иногда, когда это операция случайного воспроизведения, которая требует OOMing, вам нужно сделать обратное, то есть установить для нее что-то большое, например, 0,8, или убедиться, что ваши перемешивание перетекают на диск (это значение по умолчанию с 1.0.0).
  • Остерегайтесь утечек памяти, они часто вызваны случайным закрытием объектов, которые вам не нужны в ваших лямбдах. Способ диагностики состоит в том, чтобы искать в журналах "задачу, сериализованную в байты XXX". Если размер XXX превышает несколько килобайт или больше, чем MB, возможно, имеется утечка памяти. См. /questions/20545151/zadacha-ne-serializuema-javaionotserializableexception-pri-vyizove-funktsii-vne-zamyikaniya-tolko-dlya-klassov-a-ne-obektov/20545161#20545161
  • Связанные с выше; используйте широковещательные переменные, если вам действительно нужны большие объекты.
  • Если вы кэшируете большие RDD и можете пожертвовать временем доступа, рассмотрите возможность сериализации RDD http://spark.apache.org/docs/latest/tuning.html. Или даже кешировать их на диск (что иногда не так уж и плохо при использовании SSD).
  • (Дополнительно) В связи с вышеизложенным, избегайте String и сильно вложенные структуры (например, Map и вложенные тематические классы). Если возможно, старайтесь использовать только примитивные типы и индексировать все не примитивы, особенно если вы ожидаете много дубликатов. выберите WrappedArray по вложенным структурам, когда это возможно. Или даже разверните свою собственную сериализацию - у вас будет большая информация о том, как эффективно вернуть ваши данные в байты, использовать его!
  • (немного странно) Опять при кэшировании рассмотрите возможность использования Dataset кэшировать вашу структуру, так как она будет использовать более эффективную сериализацию. Это следует рассматривать как хак по сравнению с предыдущим пунктом пули. Встраивание знаний о предметной области в ваш алгоритм / сериализацию может минимизировать объем памяти / кэш-памяти в 100 или 1000 раз, тогда как все Dataset скорее всего даст 2x - 5x в памяти и 10x сжатый (паркет) на диске.

http://spark.apache.org/docs/1.2.1/configuration.html

РЕДАКТИРОВАТЬ: (так что я могу гуглить себя проще) Следующее также указывает на эту проблему:

java.lang.OutOfMemoryError : GC overhead limit exceeded

Чтобы добавить к этому пример использования, который часто не обсуждается, я предложу решение при отправке Spark приложение через spark-submit в локальном режиме.

Согласно справочнику Мастеринг Apache Spark от Яцека Ласковского:

Вы можете запустить Spark в локальном режиме. В этом нераспределенном режиме развертывания с одной JVM Spark порождает все исполнительные компоненты - драйвер, исполнитель, серверную часть и мастер - в одной и той же JVM. Это единственный режим, в котором драйвер используется для выполнения.

Таким образом, если вы испытываете OOM ошибки с heapдостаточно настроить driver-memory а не executor-memory,

Вот пример:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 

Вы должны настроить параметры памяти offHeap, как показано ниже:

`val spark = SparkSession
     .builder()
     .master("local[*]")
     .config("spark.executor.memory", "70g")
     .config("spark.driver.memory", "50g")
     .config("spark.memory.offHeap.enabled",true)
     .config("spark.memory.offHeap.size","16g")   
     .appName("sampleCodeForReference")
     .getOrCreate()`

Предоставьте память водителя и исполнителя в соответствии с доступностью оперативной памяти вашей машины. Вы можете увеличить размер offHeap, если вы все еще сталкиваетесь с проблемой OutofMemory.

Вы должны увеличить память водителя. В вашей папке $SPARK_HOME/conf вы должны найти файл spark-defaults.conf, отредактируйте и установите spark.driver.memory 4000m в зависимости от памяти вашего хозяина, я думаю. Это то, что исправило проблему для меня, и все идет гладко

Посмотрите на сценарии запуска, где установлен размер кучи Java, похоже, что вы не устанавливали это до запуска Spark worker.

# Set SPARK_MEM if it isn't already set since we also use it for this process
SPARK_MEM=${SPARK_MEM:-512m}
export SPARK_MEM

# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="$OUR_JAVA_OPTS"
JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"

Вы можете найти документацию по развертыванию скриптов здесь.

Я сильно пострадал от этой проблемы, мы используем динамическое распределение ресурсов, и я подумал, что он будет использовать ресурсы моего кластера для наилучшего соответствия приложению.

Но правда в том, что динамическое распределение ресурсов не устанавливает память драйвера и сохраняет ее по умолчанию, равную 1g.

Я решил эту проблему, установив в spark.driver.memory число, соответствующее памяти моего драйвера (для оперативной памяти 32 ГБ я установил 18 ГБ)

Вы можете установить его, используя команду spark submit:

spark-submit --conf spark.driver.memory=18gb ....cont

Очень важное примечание, это свойство не будет учитываться, если вы установите его из кода, в соответствии с документацией spark:

Свойства Spark в основном можно разделить на два вида: один относится к развертыванию, например "spark.driver.memory", "spark.executor.instances", на этот тип свойств нельзя повлиять при программной настройке через SparkConf во время выполнения или поведение зависит от того, какой менеджер кластера и режим развертывания вы выберете, поэтому было бы предложено установить его через конфигурационный файл или параметры командной строки spark-submit; другая в основном связана с управлением во время выполнения Spark, например "spark.task.maxFailures", этот тип свойств может быть установлен любым способом.

Вообще говоря, память Spark Executor JVM можно разделить на две части. Искровая память и Пользовательская память. Это контролируется собственностью spark.memory.fraction - значение находится в диапазоне от 0 до 1. При работе с изображениями или при интенсивной обработке памяти в искровых приложениях рассмотрите возможность уменьшения spark.memory.fraction, Это сделает больше памяти доступной для работы вашего приложения. Spark может разлиться, поэтому он все равно будет работать с меньшим объемом памяти.

Вторая часть проблемы - разделение труда. Если возможно, разделите ваши данные на более мелкие куски. Меньшие данные, возможно, требуют меньше памяти. Но если это невозможно, вы жертвуете вычислениями на память. Обычно один исполнитель будет работать с несколькими ядрами. Всего памяти исполнителей должно быть достаточно для обработки требований к памяти для всех одновременных задач. Если увеличение памяти исполнителя невозможно, вы можете уменьшить количество ядер для каждого исполнителя, чтобы каждая задача получала больше памяти для работы. Протестируйте с 1 исполнителями ядра, которые имеют максимально возможную память, которую вы можете дать, а затем продолжайте увеличивать количество ядер, пока не найдете наилучшее число ядер.

Расположение для установки размера кучи памяти (по крайней мере в spark-1.0.0) находится в conf/spark-env. Соответствующие переменные SPARK_EXECUTOR_MEMORY & SPARK_DRIVER_MEMORY, Больше документов в руководстве по развертыванию

Также не забудьте скопировать файл конфигурации на все подчиненные узлы.

Вы сбросили свой главный журнал gc? Итак, я встретил аналогичную проблему и обнаружил, что SPARK_DRIVER_MEMORY устанавливает только кучу Xmx. Первоначальный размер кучи остается 1 ГБ, и размер кучи никогда не увеличивается до размера кучи Xmx.

Передача "--conf "spark.driver.extraJavaOptions=-Xms20g решает мою проблему.

ps aux | grep java, и вы увидите следующий журнал:=

24501 30,7 1,7 41782944 2318184 pts / 0 Sl+ 18:49 0:33 /usr/java/latest / bin/java -cp /opt / spark / conf/: / opt / spark / jars / * -Xmx30g -Xms20g

У меня есть несколько предложений для вышеупомянутой ошибки.

● Убедитесь, что память исполнителя, назначенная исполнителю, может иметь дело с разделами, требующими больше памяти, чем назначено.

● Попытайтесь выяснить, работает ли больше тасов, поскольку тасования являются дорогостоящими операциями, поскольку они включают дисковый ввод-вывод, сериализацию данных и сетевой ввод-вывод.

● использовать широковещательные соединения

● Избегайте использования groupByKeys и попробуйте заменить на ReduceByKey

● Избегайте использования огромных Java-объектов везде, где происходит перетасовка

Насколько я понимаю из приведенного выше кода, он загружает файл, выполняет операцию карты и сохраняет его обратно. Нет операции, требующей перемешивания. Кроме того, нет операции, которая требует передачи данных драйверу, поэтому настройка чего-либо, связанного с перемешиванием или драйвером, может не повлиять. У драйвера действительно есть проблемы, когда задач слишком много, но это было только до версии Spark 2.0.2. Могут быть две вещи, которые идут не так.

  • Есть только один или несколько исполнителей. Увеличьте количество исполнителей, чтобы их можно было назначить различным ведомым устройствам. Если вы используете yarn, вам нужно изменить конфигурацию num-Executionors или если вы используете автономную версию Spark, тогда необходимо настроить количество ядер для каждого исполнителя и настроить максимальное количество ядер conf. В автономных исполнителей количество = максимальное количество ядер / ядер на исполнителя.
  • Количество разделов очень мало или может быть всего один. Так что, если это мало, даже если у нас есть многоядерные процессоры, несколько исполнителей, это не принесет большой пользы, поскольку распараллеливание зависит от количества разделов. Итак, увеличьте количество разделов, выполнив imageBundleRDD.repartition(11)

Просто, если вы используете скрипт или блокнот juyter, а затем установите только путь конфигурации при запуске сеанса искры...

      spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "15g").appName('testing').getOrCreate()

Ошибки пространства кучи обычно возникают из-за передачи слишком большого количества данных драйверу или исполнителю. В вашем коде не похоже, что вы что-то возвращаете драйверу, но вместо этого вы можете перегружать исполнителей, которые сопоставляют входную запись / строку с другой с помощью метода threeDReconstruction(). Я не уверен, что находится в определении метода, но это определенно вызывает перегрузку исполнителя. Теперь у вас есть 2 варианта,

  1. отредактируйте свой код, чтобы сделать 3-D реконструкцию более эффективным способом.
  2. не редактируйте код, но дайте больше памяти вашим исполнителям, а также увеличьте накладные расходы на память. [spark.executor.memory или spark.driver.memoryOverhead]

Я бы посоветовал быть осторожным с увеличением и использовать ровно столько, сколько вам нужно. Каждое задание уникально с точки зрения требований к памяти, поэтому я бы посоветовал эмпирически попробовать разные значения, увеличивая каждый раз степень 2 (256M,512M,1G и т. Д.)

Вы получите значение памяти исполнителя, которое будет работать. Попробуйте повторно запустить задание с этим значением 3 или 5 раз, прежде чем устанавливать эту конфигурацию.

Установка этих точных конфигураций помогла решить проблему.

spark-submit --conf spark.yarn.maxAppAttempts=2 --executor-memory 10g --num-executors 50 --driver-memory 12g
Другие вопросы по тегам