простой столбец, созданный с использованием ntile(), вызывает проблемы с OOM и проблемы с потерянным исполнителем при сохранении в HDFS
У меня есть простая работа по созданию децилей для 1 столбца в таблице, а затем сохранение его в виде таблицы в hdfs. Моя исходная таблица имеет 3 столбца, идентификатор, значение и дату и ~100 миллионов строк. Я хочу создать столбецdeciles
получить децили дляvalue
.
Я использовал функцию pyspark, чтобы сделать это следующим образом:
df=df.withColumn("deciles", psf.ntile(10).over(window.partitionBy().orderBy(psf.col("value").desc())))
# Save
df\
.repartition(300)\
.write.format('parquet')\
.option('compression', 'snappy')\
.option('path', '[path]')\
.mode("overwrite")\
.saveAsTable("[table_name]")
Мои конфигурации искры следующие:
spark.executor.memory=12G
spark.executor.memoryOverhead=6G
spark.executor.cores=2
spark.driver.memory=20G
spark.dynamicAllocation.enabled=True
spark.shuffle.service.enable=True
spark.dynamicAllocation.initialExecutors=200
spark.dynamicAllocation.minExecutors=150
spark.dynamicAllocation.maxExecutors=250
spark.executor.instances=120
spark.default.parallelism=300
Я также пробовал:
- переразбивка на 150, 200, 300, 500
- следующая комбинация ядер исполнителя, памяти и memoryOverhead: [1, 18, 2], [2, 16, 4] (комбинации чисто проб и ошибок)
- Я также пытался использовать
QuantileDiscretizer
и вручную выполняя задачу с длиннымCase When
заявление. - Я также пробовал сохранять как gzip. Я могу запрашивать и преобразовывать
df
хорошо, но когда я запускаюntile()
script он внезапно не будет сохранять или выполнять какие-либо действия (count(), show(), collect()), поэтому я даже не могу проверить таблицу.
Я продолжаю получать ошибку OOM, и я теряюсь в том, что еще я могу сделать, чтобы заставить эту работу работать. Любая помощь приветствуется!