простой столбец, созданный с использованием ntile(), вызывает проблемы с OOM и проблемы с потерянным исполнителем при сохранении в HDFS

У меня есть простая работа по созданию децилей для 1 столбца в таблице, а затем сохранение его в виде таблицы в hdfs. Моя исходная таблица имеет 3 столбца, идентификатор, значение и дату и ~100 миллионов строк. Я хочу создать столбецdecilesполучить децили дляvalue.

Я использовал функцию pyspark, чтобы сделать это следующим образом:

      df=df.withColumn("deciles", psf.ntile(10).over(window.partitionBy().orderBy(psf.col("value").desc())))

# Save
df\
.repartition(300)\
.write.format('parquet')\
.option('compression', 'snappy')\
.option('path', '[path]')\
.mode("overwrite")\
.saveAsTable("[table_name]")

Мои конфигурации искры следующие:

      spark.executor.memory=12G
spark.executor.memoryOverhead=6G
spark.executor.cores=2
spark.driver.memory=20G
spark.dynamicAllocation.enabled=True
spark.shuffle.service.enable=True
spark.dynamicAllocation.initialExecutors=200
spark.dynamicAllocation.minExecutors=150
spark.dynamicAllocation.maxExecutors=250
spark.executor.instances=120
spark.default.parallelism=300

Я также пробовал:

  1. переразбивка на 150, 200, 300, 500
  2. следующая комбинация ядер исполнителя, памяти и memoryOverhead: [1, 18, 2], [2, 16, 4] (комбинации чисто проб и ошибок)
  3. Я также пытался использоватьQuantileDiscretizerи вручную выполняя задачу с длиннымCase Whenзаявление.
  4. Я также пробовал сохранять как gzip. Я могу запрашивать и преобразовыватьdfхорошо, но когда я запускаюntile()script он внезапно не будет сохранять или выполнять какие-либо действия (count(), show(), collect()), поэтому я даже не могу проверить таблицу.

Я продолжаю получать ошибку OOM, и я теряюсь в том, что еще я могу сделать, чтобы заставить эту работу работать. Любая помощь приветствуется!

0 ответов

Другие вопросы по тегам