Проблема OOM контейнера при записи Dataframe в файлы паркета в Spark Job
Я использую рабочую область машинного обучения в Cloudera Data Platform (CDP). Я создал сеанс с памятью 4vCPU/16 GiB и включил Spark 3.2.0.
Я использую spark для загрузки данных за один месяц (размер данных за весь месяц составляет около 12 ГБ) и выполняю некоторые преобразования, а затем записываю данные в виде файлов паркета на AWS S3.
Моя конфигурация сеанса Spark выглядит следующим образом:
SparkSession
.builder
.appName(appName)
.config("spark.driver.memory", "8G")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.minExecutors", "4")
.config("spark.dynamicAllocation.maxExecutors", "20")
.config("spark.executor.cores", "4")
.config("spark.executor.memory", "8G")
.config("spark.sql.shuffle.partitions", 500)
......
Прежде чем данные записываются в файлы паркета, они перераспределяются: как предложил @Koedlt, я исправил столбец «соль».
старый:
df.withColumn("salt", lit(random.randrange(100)))
.repartition("date_year", "date_month", "date_day", "salt")
.drop("salt").write.partitionBy("date_year", "date_month")
.mode("overwrite").parquet(SOME__PATH)
новый:
df.withColumn("salt", floor(rand() * 100))
.repartition("date_year", "date_month", "date_day", "salt")
.drop("salt").write.partitionBy("date_year", "date_month")
.mode("overwrite").parquet(SOME__PATH)
Преобразование данных с помощью spark прошло успешно. Но задание искры всегда терпело неудачу на последнем шаге при записи данных в файлы паркета.
Ниже приведен пример сообщения об ошибке:
23/01/15 21:10:59 678 ERROR TaskSchedulerImpl: Lost executor 2 on 100.100.18.155:
The executor with id 2 exited with exit code -1(unexpected).
The API gave the following brief reason: Evicted
The API gave the following message: Pod ephemeral local storage usage exceeds the total limit of containers 10Gi.
Я думаю, что нет никаких проблем с моей конфигурацией искры. Проблема в конфигурации эфемерного ограничения размера локального хранилища в кубенете, которое я не имею права менять.
Может ли кто-нибудь объяснить, почему это произошло и каково возможное решение для этого?
1 ответ
Я вижу проблему в этой строке:
df.withColumn("salt", lit(random.randrange(100)))
Что происходит, когда вы делаете это,random.randrange(100)
оценивается один раз. Затем вы создаете буквальный столбец с постоянно повторяющимся значением. Таким образом, вы, по сути, вообще не солите, сохраняя исходные проблемы с искажением данных. Возможно, они лежат в основе вашей эфемерной проблемы с локальным хранилищем.
Вам необходимо использоватьpyspark.sql.functions.rand
функция для правильного создания случайных столбцов и соли.
Покажем небольшой пример. Со следующими простыми входными данными:
df = spark.createDataFrame(
[
(1, 1, "ABC"),
(1, 2, "BCD"),
(1, 3, "DEF"),
(2, 1, "EFG"),
(2, 2, "GHI"),
(2, 3, "HIJ"),
(3, 1, "EFG"),
(3, 2, "BCD"),
(3, 3, "HIJ"),
],
["KEY", "ORDER", "RESP"]
)
Делать то, что вы делали:
df.withColumn("salt", lit(random.randrange(100))).show()
+---+-----+----+----+
|KEY|ORDER|RESP|salt|
+---+-----+----+----+
| 1| 1| ABC| 86|
| 1| 2| BCD| 86|
| 1| 3| DEF| 86|
| 2| 1| EFG| 86|
| 2| 2| GHI| 86|
| 2| 3| HIJ| 86|
| 3| 1| EFG| 86|
| 3| 2| BCD| 86|
| 3| 3| HIJ| 86|
+---+-----+----+----+
В то время как использование правильных функций pyspark:
df.withColumn("salt", floor(rand() * 100)).show()
+---+-----+----+----+
|KEY|ORDER|RESP|salt|
+---+-----+----+----+
| 1| 1| ABC| 66|
| 1| 2| BCD| 40|
| 1| 3| DEF| 99|
| 2| 1| EFG| 55|
| 2| 2| GHI| 23|
| 2| 3| HIJ| 41|
| 3| 1| EFG| 61|
| 3| 2| BCD| 0|
| 3| 3| HIJ| 33|
+---+-----+----+----+