Сбой задания Pyspark в Google Cloud Dataproc

Я создал кластер Dataproc с 1 мастером и 10 узлами. Все они имеют одинаковую конфигурацию процессора и памяти: 32 vCPU, 120 ГБ памяти. Когда я представил работу, которая обрабатывает большой объем данных и расчетов. Работа не удалась.

Из журнала я не совсем уверен, что вызвало сбой. Но я видел сообщение об ошибке, связанной с памятью, от tJob#: job-c46fc848-6: Контейнер уничтожен YARN за превышение пределов памяти. Используется 24,1 ГБ из 24 ГБ физической памяти. Попробуйте увеличить spark.yarn.executor.memoryOverhead.

Поэтому я попробовал несколько решений, которые я нашел из других постов. Например, я пытался увеличить spark.executor.memoryOverhead и spark.driver.maxResultSize в разделе "Свойства" при отправке задания из консоли "Задания". Задание # find-duplicate-job-c46fc848-7 все еще не выполнено.

Я также видел предупреждающие сообщения и не совсем уверен, что это значит: 18/06/04 17:13:25 WARN org.apache.spark.storage.BlockManagerMasterEndpoint: больше нет доступных реплик для rdd_43_155!

Я собираюсь попытаться создать кластер более высокого уровня, чтобы увидеть, работает ли он. Но я сомневаюсь, что это решит проблему, так как кластер с 1 ведущим и 10 узлами с 32 vCPU, 120 ГБ памяти уже очень мощный.

Надеюсь получить помощь от опытных пользователей и экспертов. Заранее спасибо!

1 ответ

Решение

Коренная причина сбоя была связана с памятью, вызванной самопереключением. Это все еще терпело неудачу, даже когда я продолжал увеличивать мощность процессора и память. Таким образом, решение этого состоит в следующем.

  1. Используйте функцию repartition() для перераспределения после объединения до следующего преобразования. Это исправит проблему с перекосом данных. Пример: df_joined = df_joined.repartition(разделы)
  2. Трансляция правого стола.
  3. Разбейте его на 10 итераций. В каждой итерации я обрабатываю только 1/10 левой таблицы, объединенной с полными данными правой таблицы.

Смотрите пример кода:

groups = 10 <br/>
for x in range(0, groups): 
  df_joined = df1.join(broadcast(df2), (df1.authors == df2.authors)).where((col("df1.content_id") % groups == x)) 

С учетом вышеупомянутых 3 методов я смог завершить работу за 1,5 часа и использовал только 1 главный и 4 рабочих узла (8 ЦП и 30 ГБ для каждого виртуального компьютера).

Другие вопросы по тегам