Почему задания Spark не выполняются с org.apache.spark.shuffle.MetadataFetchFailedException: отсутствует выходное местоположение для shuffle 0 в режиме спекуляции?
Я запускаю работу Spark в режиме спекуляции. У меня около 500 заданий и около 500 файлов размером 1 ГБ в сжатом виде. Я продолжаю получать на каждой работе, для 1-2 задач, прилагаемую ошибку, где она повторяется потом десятки раз (мешая завершить работу).
org.apache.spark.shuffle.MetadataFetchFailedException: отсутствует выходное местоположение для перемешивания 0
Есть идеи, в чем смысл проблемы и как ее преодолеть?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
11 ответов
Это случилось со мной, когда я отдал рабочему узлу больше памяти, чем он имел. Так как у него не было перестановки, искра не работала при попытке сохранить объекты для перестановки без остатка памяти.
Решением было либо добавить подкачку, либо настроить работника / исполнителя на использование меньшего количества памяти в дополнение к использованию уровня хранения MEMORY_AND_DISK для нескольких сохранений.
Ошибка возникает, когда в определенном разделе искры много данных. Чтобы решить эту проблему, выполните следующие действия:
- Увеличьте количество разделов в случайном порядке: --conf spark.sql.shuffle.partitions=<some-high-number-lets say 200>
- В обычных случаях количество разделов должно быть установлено как количество исполнителей * количество ядер на исполнителя. Но такая схема разбиения будет проблематичной, если у нас будет огромное количество данных. См. пример ниже.
Предположим, у нас были следующие данные, и у нас было три исполнителя с 1 ядром каждый, поэтому количество разделов (физических разделов) в этом случае было бы 3
Data: 1,2,3,4,5,6,7,8,9,13,16,19,22
Partitions: 1,2,3
Distribution of Data in Partitions (partition logic based on modulo by 3)
1-> 1,4,7,13,16,19,22
2-> 2,5,8
3->3,6,9
From above we can see that there is data skew, partition 1 is having more
data than the rest
Now lets increase the number of partitions to : number of executors * number
of cores per executor*2 = 6 (in our example. These 6 partitions will be
logical partitions.Now each executor will be having 2 logical partitions
instead of 1 .Data partitioning will be based on modulo 6 instead of 3.
Partitions of data in each executor:
1->(0,1)->1,6,7,13,19
2->(2,3)-->2,3,8,9
3->(4,5)->4,5,16,22
The increase in logical partitions leads to fair partitioning.
Следующее, что вы можете сделать после увеличения количества разделов в случайном порядке, — это уменьшить часть хранения искровой памяти, если вы не сохраняете или не кэшируете какой-либо кадр данных. По умолчанию часть хранения равна 0,5, а часть выполнения также равна 0,5. Чтобы уменьшить часть хранилища, вы можете установить в своей команде spark-submit следующую конфигурацию.
--conf spark.memory.storageFraction=0.3
4.) Помимо двух вышеперечисленных вещей, вы также можете установить служебную память исполнителя. --conf spark.executor.memoryOverhead=2g
This is off-heap memory that is used for Virtual Machine overheads, interned
strings etc.
5.) Помимо этого, вы можете ограничить количество файлов, обрабатываемых в конкретном микропакете, установив maxFilesPerTrigger на меньшее значение, скажем, 10.
У нас была похожая ошибка со Spark, но я не уверен, что она связана с вашей проблемой.
Мы использовали JavaPairRDD.repartitionAndSortWithinPartitions
на 100 ГБ данных, и он продолжал работать так же, как ваше приложение. Затем мы посмотрели журналы Yarn на определенных узлах и обнаружили, что у нас есть какая-то проблема нехватки памяти, поэтому Yarn прервал выполнение. Нашим решением было изменить / добавить spark.shuffle.memoryFraction 0
в .../spark/conf/spark-defaults.conf
, Это позволило нам обрабатывать намного больший (но, к сожалению, не бесконечный) объем данных таким образом.
Я получил ту же проблему на моем 3-х машинном кластере YARN. Я продолжал менять оперативную память, но проблема сохранялась. Наконец я увидел следующие сообщения в журналах:
17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms
и после этого было это сообщение:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67
Я изменил свойства в spark-defaults.conf следующим образом:
spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000
Это оно! Моя работа успешно завершена после этого.
В веб-интерфейсе Spark, если есть такая информация, как Executors lost
, тогда вам нужно проверить журнал пряжи, убедиться, не убит ли ваш контейнер.
Если контейнер был убит, вероятно, из-за нехватки памяти.
Как найти ключевую информацию в журналах пряжи? Например, могут быть такие предупреждения:
Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
В этом случае предлагается увеличить spark.yarn.executor.memoryOverhead
.
Я решил эту ошибку, увеличив выделенную память в executorMemory и driverMemory. Вы можете сделать это в HUE, выбрав Spark Program, которая вызывает проблему, и в свойствах -> Список опций вы можете добавить что-то вроде этого:
--driver-memory 10G --executor-memory 10G --num-executors 50 --executor-cores 2
Конечно, значения параметров будут варьироваться в зависимости от размера вашего кластера и ваших потребностей.
Для меня я делал некоторые работы с большими данными (около 50B строк) и получал нагрузку на лодку
ExternalAppendOnlyUnsafeRowArray:54
- Достигнут порог разлива 4096 рядов, переход наorg.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
В моих логах. Очевидно, что 4096 может быть небольшим при таком размере данных... это привело меня к следующей JIRA:
https://issues.apache.org/jira/browse/SPARK-21595
И в конечном итоге к следующим двум параметрам конфигурации:
spark.sql.windowExec.buffer.spill.threshold
spark.sql.windowExec.buffer.in.memory.threshold
Оба по умолчанию 4096; Я поднял их намного выше (2097152), и сейчас все, кажется, идет хорошо. Я не уверен на 100%, что это то же самое, что и проблема, поднятая здесь, но это другая вещь, которую нужно попробовать.
В моем случае (автономный кластер) было сгенерировано исключение, поскольку файловая система некоторых ведомых Spark была заполнена на 100%. Удаление всего в spark/work
папки рабов решили проблему.
Для меня решение состоит в том, чтобы взглянуть на код и понять, что мы используем перераспределение (1), чтобы сохранить действительно большую таблицу =|.
Я получил ту же проблему, но я искал много ответов, которые не могут решить мою проблему. в конце концов, я отлаживаю свой код шаг за шагом. Я считаю, что проблема, вызванная размером данных, не сбалансирована для каждого раздела, MetadataFetchFailedException
что в map
этап не reduce
этап. просто делать df_rdd.repartition(nums)
до reduceByKey()
Эта проблема о памяти. Вы передаете память, которая недоступна. В команде spark submit есть 3 параметра, касающихся памяти и исполнения. Эти параметры являются
--driver-memory 1200M
--driver-cores 1
--num-executors 1
Максимум памяти водителя n/2
(n
= общая память узла.)
Дайте ядра-ядра n/1000
(n
= память драйвера (в МБ)
Дайте число исполнителей n
(n
= количество узлов у вас есть)
Если вы не дадите правильные параметры в команде spark submit, будет медленный ответ, или это вызовет исключение.