Ошибка ExecutorLostFailure при запуске задачи в Spark

когда я пытаюсь запустить его в этой папке, он каждый раз выдает меня ExecutorLostFailure

Привет, я новичок в Spark. Я пытаюсь запустить задание на Spark 1.4.1 с 8 подчиненными узлами с 11,7 ГБ памяти на каждый диск 3,2 ГБ. Я запускаю задачу Spark с одного из подчиненных узлов (из 8 узлов) (т. Е. С объемом хранилища 0,7 доступно приблизительно 4,8 ГБ на каждом узле) и использую Mesos в качестве диспетчера кластеров. Я использую эту конфигурацию:

spark.master mesos://uc1f-bioinfocloud-vamp-m-1:5050
spark.eventLog.enabled true
spark.driver.memory 6g
spark.storage.memoryFraction 0.7
spark.core.connection.ack.wait.timeout 800
spark.akka.frameSize 50
spark.rdd.compress true

Я пытаюсь запустить наивный байесовский алгоритм Spark MLlib в папке размером около 14 ГБ данных. (Нет проблем, когда я запускаю задачу в папке размером 6 ГБ) Я читаю эту папку из хранилища Google как RDD и задаю 32 в качестве параметра раздела (я также пытался увеличить раздел). Затем используя TF для создания векторного признака и прогнозирования на основе этого. Но когда я пытаюсь запустить его в этой папке, он каждый раз выдает меня ExecutorLostFailure. Я пробовал разные конфигурации, но ничего не помогает. Может быть, я упускаю что-то очень простое, но не могу понять. Любая помощь или предложение будут очень ценными.

Лог это:

   15/07/21 01:18:20 ERROR TaskSetManager: Task 3 in stage 2.0 failed 4 times; aborting job    
15/07/21 01:18:20 INFO TaskSchedulerImpl: Cancelling stage 2    
15/07/21 01:18:20 INFO TaskSchedulerImpl: Stage 2 was cancelled    
15/07/21 01:18:20 INFO DAGScheduler: ResultStage 2 (collect at /opt/work/V2ProcessRecords.py:213) failed in 28.966 s    
15/07/21 01:18:20 INFO DAGScheduler: Executor lost: 20150526-135628-3255597322-5050-1304-S8 (epoch 3)    
15/07/21 01:18:20 INFO BlockManagerMasterEndpoint: Trying to remove executor 20150526-135628-3255597322-5050-1304-S8 from BlockManagerMaster.    
15/07/21 01:18:20 INFO DAGScheduler: Job 2 failed: collect at /opt/work/V2ProcessRecords.py:213, took 29.013646 s    
Traceback (most recent call last):    
  File "/opt/work/V2ProcessRecords.py", line 213, in <module>
    secondPassRDD = firstPassRDD.map(lambda ( name, title,  idval, pmcId, pubDate, article, tags , author, ifSigmaCust, wclass): ( str(name), title,  idval, pmcId, pubDate, article, tags , author, ifSigmaCust , "Yes" if ("PMC" + pmcId) in rddNIHGrant else ("No") , wclass)).collect()    
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 745, in collect    
  File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__    
  File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 4 times, most recent failure: Lost task 3.3 in stage 2.0 (TID 12, vamp-m-2.c.quantum-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost)    
Driver stacktrace:    
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at       org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at    org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at     scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at     org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at    org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

15/07/21 01:18:20 INFO BlockManagerMaster: Removed 20150526-135628-3255597322-5050-1304-S8 successfully in removeExecutor
15/07/21 01:18:20 INFO DAGScheduler: Host added was in lost list earlier:vamp-m-2.c.quantum-854.internal
Jul 21, 2015 1:01:15 AM INFO: parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5
15/07/21 01:18:20 INFO SparkContext: Invoking stop() from shutdown hook



{"Event":"SparkListenerTaskStart","Stage ID":2,"Stage Attempt ID":0,"Task Info":{"Task ID":11,"Index":6,"Attempt":2,"Launch Time":1437616381852,"Executor ID":"20150526-135628-3255597322-5050-1304-S8","Host":"uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}

{"Событие":"SparkListenerExecutorRemoved","Отметка времени":1437616389696,"ИД исполнителя":"20150526-135628-3255597322-5050-1304-S8","Удаленная причина": "Потерянный исполнитель"} {"Событие":"SparkListenerTaskEnd"," Идентификатор этапа ": 2," Идентификатор этапа ": 0," Тип задачи ":"ResultTask"," Причина конца задачи ":{" Причина ":"ExecutorLostFailure"," Идентификатор исполнителя ":"20150526-135628-3255597322-5050-1304-S8"}," Информация о задаче ":{" Идентификатор задачи ": 11," Индекс ":6," Попытка ": 2," Время запуска ":1437616381852," Идентификатор исполнителя ":" 20150526-135628-3255597322-5050-1304-S8", "Хост":"uc1f-bioinfocloud-вамп-м-2.c.quantum-устройства-854.internal","Местность":"PROCESS_LOCAL","Спекулятивный": false, "Время получения результата": 0, "Время окончания":1437616389697,"Ошибка": истина, "Накопления":[]}} {"Событие":"SparkListenerExecutorAdded","Метка времени":1437616389707,"ИД исполнителя ":" 20150526-135628-3255597322-5050-1304-S8 "," Информация об исполнителе ":{" Хост ":"uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal"," Всего Ядра ": 1," Журналы URL ":{}}} {" Событие ":"SparkListenerTaskStart"," Идентификатор этапа ": 2," Идентификатор попытки этапа ": 0," Информация о задаче ":{"ИД задачи": 12, "Индекс": 6, "Попытка": 3, "Время запуска":1437616389702,"ИД исполнителя":"20150526-135628-3255597322-5050-1304-S8","Хост":"uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal","Locality":"PROCESS_LOCAL","Speculartive":false,"Время получения результата": 0, "Время завершения":0,"Failed":false,"Accumulables":[]}} {"Event":"SparkListenerExecutorRemoved","Timestamp":1437616397743,"ID исполнителя":"20150526-135628-3255597322-5050-1304-S8"," Удаленная причина ":" Потерянный исполнитель "} {" Событие ":" SparkListenerTaskEnd "," Идентификатор этапа ": 2," Идентификатор попытки этапа ": 0," Тип задачи ":"ResultTask"," Причина завершения задачи ":{" Причина ":"ExecutorLostFailure"," ИД исполнителя ":" 20150526-135628-3255597322-5050-1304-S8 "}," Информация о задаче ":{" ИД задачи ": 12," Индекс ":6," Попытка ": 3, "Время запуска":1437616389702,"ИД исполнителя":"20150526-135628-3255597322-5050-1304-S8","Хост":"uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854"..internal "," Локальность ":" PROCESS_LOCAL "," Спекулятивный ": false," Время получения результата ": 0," Время завершения ": 1437616397743," Ошибка ": истина," Накопления ":[]}} {" Событие ":"SparkListenerStageCompleted"," Информация об этапе ":{" Идентификатор этапа ": 2," Идентификатор этапа ": 0," Имя этапа ":" собирать в /opt/work/V2ProcessRecords.py:215","Number of Задачи ":72," Информация RDD ":[{" ID RDD ":6," Имя ":"PythonRDD"," Идентификаторы родителей ":[0]," Уровень хранения ":{" Использовать диск ": false," Использовать память ": false," Использовать ExternalBlockStore":false," Десериализовано ": false," Репликация ":1}," Количество разделов ":72," Количество кэшируемых разделов ": 0," Объем памяти ":0,"ExternalBlockStore Size":0,"Размер диска":0},{"RDD ID":0,"Имя":"gs://uc1f-bioinfocloud-vamp-m/literature/xml/P*/*.nxml" ","Scope":"{\"id\":\"0\",\"name\":\"wholeTextFiles\"}"," Родительские идентификаторы ":[]," Уровень хранения ":{" Использовать Диск ": false," Использовать память ": false," Использовать ExternalBlockStore":false," Десериализовано ": false," Репликация ":1}," Количество разделов ":72," Количество кэшированных разделов ": 0," Размер памяти ": 0," Размер ExternalBlockStore ": 0," Размер диска ":0}]," Родительские идентификаторы ":[]," Детали ":" "," Время отправки ":1437616365566," Время завершения ":1437616397753,"Причина отказа": "Задание прервано из-за сбоя этапа: Задача 6 в на этапе 2.0 произошел сбой 4 раза, последний сбой: потерянное задание 6.3 на этапе 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 потеряно)\nDriver stacktrace:","Accumulables":[]}} {" Событие ":"SparkListenerJobEnd"," Идентификатор задания ": 2," Время завершения ":1437616397755," Результат задания ": { "Результат":"JobFailed","Exception":{"Message":"Задание прервано из-за сбоя этапа: сбой задачи 6 на этапе 2.0 4 раза, последний сбой: потерянное задание 6.3 на этапе 2.0 (TID 12, uc1f- bioinfocloud-vamp-m-2.c.quantum-device-854.internal): ExecutorLostFailure (исполнитель 20150526-135628-3255597322-5050-1304-S8 потерян) \ n Отслеживание стека драйверов: "," Трассировка стека ":[{" Объявление Класс ":"org.apache.spark.scheduler.DAGScheduler"," Имя метода ":"org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages"," Имя файла ":"DAGScheduler.scala"," Номер строки ":1266},{"Объявление класса":"org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1","Имя метода":"apply","File Имя ":"DAGScheduler.scala"," Номер строки ":1257},{" Объявление класса ":" org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1 "," Имя метода ":" apply ", "Имя файла": "DAGScheduler.scala", "Номер строки":1256},{"Объявление класса":"scala.collection.mutable.ResizableArray$class","Имя метода": "foreach", "Имя файла":"ResizableArray.scala","Номер строки":59},{"Объявление класса":"scala.collection.mutable.ArrayBuffer","Имя метода": "foreach", "Имя файла":"ArrayBuffer.scala","Номер строки":47},{"Объявление класса": "org.apache.spark.scheduler.DAGScheduler", "Имя метода":"abortStage","Имя файла": "DAGScheduler.scala", "Номер строки ":1256},{" Объявление класса ":"org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1"," Имя метода ":" apply "," Имя файла ":"DAGScheduler.scala"," Номер строки ":730},{" Объявление класса ":"org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1"," Имя метода ":" apply "," Имя файла ":" DAGScheduler.scala ", "Номер строки":730},{"Объявление класса":"scala.Option","Имя метода ":" foreach "," Имя файла ":"Option.scala"," Номер строки ":236},{" Объявление класса ":"org.apache.spark.scheduler.DAGScheduler"," Имя метода ":"handleTaskSetFailed"," Имя файла ":"DAGScheduler.scala"," Номер строки ":730},{" Объявление класса ":"org.apache.spark.scheduler.DAGSchedulerEventProcessLoop"," Имя метода ":"onReceive"," Файл " Имя ":"DAGScheduler.scala"," Номер строки ":1450},{" Объявление класса ":"org.apache.spark.scheduler.DAGSchedulerEventProcessLoop"," Имя метода ":"onReceive"," Имя файла ":"DAGScheduler.scala"," Номер строки ":1411},{" Объявление класса ":"org.apache.spark.util.EventLoop$$anon$1"," Имя метода ":" run "," Имя файла ":"EventLoop.scala"," Номер строки ":48}]}}}

4 ответа

Самая распространенная причина ExecutorLostFailure, как я понимаю, это OOM в executor.

Чтобы решить проблему OOM, нужно выяснить, что именно вызывает ее. Простое увеличение параллелизма по умолчанию или увеличение памяти исполнителя не является стратегическим решением.

Если вы посмотрите на то, что делает увеличивающийся параллелизм, он пытается создать больше исполнителей, чтобы каждый исполнитель мог работать с все меньшим количеством данных. Но если ваши данные искажены так, что ключ, на котором происходит разделение данных (для параллелизма), содержит больше данных, простое увеличение параллелизма не будет иметь никакого эффекта.

Точно так же простое увеличение памяти Executor будет очень неэффективным способом обработки такого сценария, как если бы с ExecutorLostFailure произошел сбой только одного исполнителя, запрос увеличенной памяти для всех исполнителей заставит ваше приложение потребовать гораздо больше памяти, чем ожидалось.

Эта ошибка возникает из-за сбоя задачи более четырех раз. Попробуйте увеличить параллельность в вашем кластере, используя следующий параметр.

--conf "spark.default.parallelism=100" 

Установите значение параллелизма от 2 до 3, в зависимости от количества ядер, доступных в вашем кластере. Если это не сработает. Попробуйте увеличить параллелизм экспоненциально. т.е. если ваш текущий параллелизм не работает, умножьте его на два и так далее. Также я заметил, что это помогает, если ваш уровень параллелизма простое число, особенно если вы используете groupByKkey.

Трудно сказать, в чем проблема, если не журнал неудачного исполнителя, а не драйвер, но, скорее всего, это проблема с памятью. Попробуйте значительно увеличить номер раздела (если ваш текущий 32, попробуйте 200)

У меня была эта проблема, и проблема для меня была очень высокая частота одного ключа в reduceByKey задача. Это (я думаю) привело к тому, что один из исполнителей собрал огромный список, который затем выдавал бы ошибки OOM.

Решением для меня было просто отфильтровать ключи с большим количеством людей, прежде чем делать reduceByKey, но я ценю, что это может или не может быть возможным в зависимости от вашего приложения. Мне все равно не нужны были все мои данные.

Другие вопросы по тегам