Искру исполнителю не хватает памяти в join и reduByKey
В spark2.0 у меня есть два фрейма данных, и мне нужно сначала присоединиться к ним и сделать lowerByKey для агрегирования данных. Я всегда получал ООМ в исполнителе. Заранее спасибо.
Данные
d1 (1G, 500 миллионов строк, кэшировано, разделено на col id2)
id1 id2
1 1
1 3
1 4
2 0
2 7
...
d2 (160G, 2 миллиона строк, кэшированные, разделенные по col id2, значение col содержит список из 5000 чисел с плавающей запятой)
id2 value
0 [0.1, 0.2, 0.0001, ...]
1 [0.001, 0.7, 0.0002, ...]
...
Теперь мне нужно объединить две таблицы, чтобы получить d3, и я использую spark.sql
select d1.id1, d2.value
FROM d1 JOIN d2 ON d1.id2 = d2.id2
Затем я делаю ReduceByKey для d3 и собираю значения для каждого id1 в таблице d1.
d4 = d3.rdd.reduceByKey(lambda x, y: numpy.add(x, y)) \
.mapValues(lambda x: (x / numpy.linalg.norm(x, 1)).toList)\
.toDF()
Я сделал оценку, что размер d4 будет 340G. Теперь я использую на машине r3.8xlarge, чтобы запустить работу
mem: 244G
cpu: 64
Disk: 640G
Вопросы
Я играл с некоторыми конфигурациями, но я всегда получал OOM в исполнителе. Итак, вопросы
Возможно ли выполнить эту работу на текущем типе машины? или я должен просто использовать большую машину (насколько большой?). Но я помню, что я сталкивался со статьями / блогами, в которых говорилось, что они обрабатывают терабайты на относительно небольших машинах.
Какое улучшение я должен сделать? например, конфигурация искры, оптимизация кода?
Можно ли оценить объем памяти, необходимый для каждого исполнителя?
Конфигурация искры
Некоторые конфигурации Spark, которые я пробовал
config1:
--verbose
--conf spark.sql.shuffle.partitions=200
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc - XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3
config2:
--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=24G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 4
--executor-memory 48G
--executor-cores 15
--driver-memory 24G
--driver-cores 3
конфиг 3:
--verbose
--conf spark.sql.shuffle.partitions=10000
--conf spark.dynamicAllocation.enabled=true
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--executor-memory 6G
--executor-cores 2
--driver-memory 6G
--driver-cores 3
конфиг 4:
--verbose
--conf spark.sql.shuffle.partitions=20000
--conf spark.dynamicAllocation.enabled=false
--conf spark.driver.maxResultSize=6G
--conf spark.shuffle.blockTransferService=nio
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.kryoserializer.buffer.max=2000M
--conf spark.rpc.message.maxSize=800
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MetaspaceSize=100M"
--num-executors 13
--executor-memory 15G
--executor-cores 5
--driver-memory 13G
--driver-cores 5
ОШИБКА
OOM Error1 от исполнителя
ExecutorLostFailure (executor 14 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Heap
PSYoungGen total 1830400K, used 1401721K [0x0000000740000000, 0x00000007be900000, 0x00000007c0000000)
eden space 1588736K, 84% used [0x0000000740000000,0x0000000791e86980,0x00000007a0f80000)
from space 241664K, 24% used [0x00000007af600000,0x00000007b3057de8,0x00000007be200000)
to space 236032K, 0% used [0x00000007a0f80000,0x00000007a0f80000,0x00000007af600000)
ParOldGen total 4194304K, used 4075884K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 97% used [0x0000000640000000,0x0000000738c5b198,0x0000000740000000)
Metaspace used 59721K, capacity 60782K, committed 61056K, reserved 1101824K
class space used 7421K, capacity 7742K, committed 7808K, reserved 1048576K
OOM Error2 от исполнителя
ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container marked as failed: container_1477662810360_0002_01_000008 on host: ip-172-18-9-130.ec2.internal. Exit status: 52. Diagnostics: Exception from container-launch.
Heap
PSYoungGen total 1968128K, used 1900544K [0x0000000740000000, 0x00000007c0000000, 0x00000007c0000000)
eden space 1900544K, 100% used [0x0000000740000000,0x00000007b4000000,0x00000007b4000000)
from space 67584K, 0% used [0x00000007b4000000,0x00000007b4000000,0x00000007b8200000)
to space 103936K, 0% used [0x00000007b9a80000,0x00000007b9a80000,0x00000007c0000000)
ParOldGen total 4194304K, used 4194183K [0x0000000640000000, 0x0000000740000000, 0x0000000740000000)
object space 4194304K, 99% used [0x0000000640000000,0x000000073ffe1f38,0x0000000740000000)
Metaspace used 59001K, capacity 59492K, committed 61056K, reserved 1101824K
class space used 7300K, capacity 7491K, committed 7808K, reserved 1048576K
Ошибка из контейнера
16/10/28 14:33:21 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$3$$anon$2.hasNext(WholeStageCodegenExec.scala:386)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:36 ERROR Utils: Uncaught exception in thread driver-heartbeater
16/10/28 14:33:26 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Double.valueOf(Double.java:519)
at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.get(UnsafeArrayData.java:138)
at org.apache.spark.sql.catalyst.util.ArrayData.foreach(ArrayData.scala:135)
at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:64)
at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:57)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2517)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2517)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
16/10/28 14:33:43 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[stdout writer for python,5,main]
Обновление 1
Похоже, данные d1 довольно искажены, если я разделю на id2. В результате объединение вызовет OOM. Если d1 был распределен равномерно, как я думал раньше, приведенная выше конфигурация должна работать.
Обновление 2
Я опубликовал свои попытки решить проблему на случай, если кто-то также столкнется с подобными проблемами.
Attempt1
Моя проблема в том, что если я разделю d1 по id2, то данные будут довольно искажены. В результате существуют некоторые разделы, которые содержат почти все id1. Следовательно, соединение с d2 вызовет ошибку OOM. Чтобы смягчить такую проблему, я сначала идентифицирую подмножество s
из id2, который может вызвать такие перекос данных, если разделение по id2. Затем я создаю d5 из d2, включая только s
и d6 из d2, исключая s
, К счастью, размер d5 не слишком велик. Итак, я могу транслировать соединение d1 с d5. Тогда я присоединяюсь к d1 и d6. Затем я объединяю два результата и делаю reduByKey. Я очень близок к решению проблемы. Я не продолжил в том же духе, потому что в дальнейшем мой d1 мог значительно вырасти. Другими словами, этот подход не очень масштабируемый для меня
Attempt2
К счастью, в моем случае большинство значений в d2 очень мало. Основываясь на моем приложении, я могу безопасно удалить небольшие значения и преобразовать вектор в sparseVector, чтобы значительно уменьшить размер d2. После этого я делю d1 на id1 и транслирую соединение d2 (после удаления небольших значений). Конечно, нужно увеличить память драйвера, чтобы позволить относительно большую широковещательную переменную. Это работает для меня и также масштабируется для моего приложения.
2 ответа
Вот что можно попробовать: немного уменьшить размер исполнителя. В настоящее время вы получили:
--executor-memory 48G
--executor-cores 15
Дайте этому попробовать:
--executor-memory 16G
--executor-cores 5
Меньший размер исполнителя кажется оптимальным по ряду причин. Одним из них является то, что размер кучи Ja va больше 32G приводит к тому, что ссылки на объекты увеличиваются с 4 байтов до 8, и все требования к памяти увеличиваются.
Редактировать: проблема может заключаться в том, что разделы d4 слишком велики (хотя другой совет по-прежнему применим!). Вы можете решить эту проблему, перераспределив d3 на большее количество разделов (примерно d1 * 4), или передав его numPartitions
необязательный аргумент reduceByKey
, Обе эти опции вызовут случайное перемешивание, но это лучше, чем сбой.
Я получил ту же проблему, но я искал много ответов, которые не могут решить мою проблему. в конце концов, я отлаживаю свой код шаг за шагом. Я считаю, что проблема, вызванная размером данных, не сбалансирована для каждого раздела. просто делать df_rdd.repartition(nums)