Проблема OOM контейнера при записи Dataframe в файлы паркета в Spark Job

Я использую рабочую область машинного обучения в Cloudera Data Platform (CDP). Я создал сеанс с памятью 4vCPU/16 GiB и включил Spark 3.2.0.

Я использую spark для загрузки данных за один месяц (размер данных за весь месяц составляет около 12 ГБ) и выполняю некоторые преобразования, а затем записываю данные в виде файлов паркета на AWS S3.

Моя конфигурация сеанса Spark выглядит следующим образом:

      SparkSession
         .builder
         .appName(appName)
         .config("spark.driver.memory", "8G")
         .config("spark.dynamicAllocation.enabled", "true")
         .config("spark.dynamicAllocation.minExecutors", "4")
         .config("spark.dynamicAllocation.maxExecutors", "20")
         .config("spark.executor.cores", "4")
         .config("spark.executor.memory", "8G")
         .config("spark.sql.shuffle.partitions", 500)
......

Прежде чем данные записываются в файлы паркета, они перераспределяются: как предложил @Koedlt, я исправил столбец «соль».

старый:

      df.withColumn("salt", lit(random.randrange(100)))
.repartition("date_year", "date_month", "date_day", "salt")
.drop("salt").write.partitionBy("date_year", "date_month")
.mode("overwrite").parquet(SOME__PATH)

новый:

      df.withColumn("salt", floor(rand() * 100))
.repartition("date_year", "date_month", "date_day", "salt")
.drop("salt").write.partitionBy("date_year", "date_month")
.mode("overwrite").parquet(SOME__PATH)

Преобразование данных с помощью spark прошло успешно. Но задание искры всегда терпело неудачу на последнем шаге при записи данных в файлы паркета.

Ниже приведен пример сообщения об ошибке:

      23/01/15 21:10:59 678 ERROR TaskSchedulerImpl: Lost executor 2 on 100.100.18.155: 
The executor with id 2 exited with exit code -1(unexpected).
The API gave the following brief reason: Evicted
The API gave the following message: Pod ephemeral local storage usage exceeds the total limit of containers 10Gi. 

Я думаю, что нет никаких проблем с моей конфигурацией искры. Проблема в конфигурации эфемерного ограничения размера локального хранилища в кубенете, которое я не имею права менять.

Может ли кто-нибудь объяснить, почему это произошло и каково возможное решение для этого?

1 ответ

Я вижу проблему в этой строке:

      df.withColumn("salt", lit(random.randrange(100)))

Что происходит, когда вы делаете это,random.randrange(100)оценивается один раз. Затем вы создаете буквальный столбец с постоянно повторяющимся значением. Таким образом, вы, по сути, вообще не солите, сохраняя исходные проблемы с искажением данных. Возможно, они лежат в основе вашей эфемерной проблемы с локальным хранилищем.

Вам необходимо использоватьpyspark.sql.functions.randфункция для правильного создания случайных столбцов и соли.

Покажем небольшой пример. Со следующими простыми входными данными:

      df = spark.createDataFrame(
    [
        (1, 1, "ABC"),
        (1, 2, "BCD"),
        (1, 3, "DEF"),
        (2, 1, "EFG"),
        (2, 2, "GHI"),
        (2, 3, "HIJ"),
        (3, 1, "EFG"),
        (3, 2, "BCD"),
        (3, 3, "HIJ"),
    ],
    ["KEY", "ORDER", "RESP"]
)

Делать то, что вы делали:

      df.withColumn("salt", lit(random.randrange(100))).show()
+---+-----+----+----+
|KEY|ORDER|RESP|salt|
+---+-----+----+----+
|  1|    1| ABC|  86|
|  1|    2| BCD|  86|
|  1|    3| DEF|  86|
|  2|    1| EFG|  86|
|  2|    2| GHI|  86|
|  2|    3| HIJ|  86|
|  3|    1| EFG|  86|
|  3|    2| BCD|  86|
|  3|    3| HIJ|  86|
+---+-----+----+----+

В то время как использование правильных функций pyspark:

      df.withColumn("salt", floor(rand() * 100)).show()
+---+-----+----+----+
|KEY|ORDER|RESP|salt|
+---+-----+----+----+
|  1|    1| ABC|  66|
|  1|    2| BCD|  40|
|  1|    3| DEF|  99|
|  2|    1| EFG|  55|
|  2|    2| GHI|  23|
|  2|    3| HIJ|  41|
|  3|    1| EFG|  61|
|  3|    2| BCD|   0|
|  3|    3| HIJ|  33|
+---+-----+----+----+
Другие вопросы по тегам