Как динамически выбирать spark.sql.shuffle.partitions

В настоящее время я обрабатываю данные с помощью раздела spark и foreach, открываю соединение с mysql и вставляю его в базу данных в количестве 1000. Как указано в значении по умолчанию SparkDocumentation: spark.sql.shuffle.partitions 200, но я хочу, чтобы он был динамичным. Итак, как мне рассчитать это. Следовательно, ни выбор очень высокого значения, вызывающего снижение производительности, ни выбор очень малого значения, вызывающего OOM,

1 ответ

Попробуйте вариант ниже -

val numExecutors         = spark.conf.get("spark.executor.instances").toInt

val numExecutorsCores    = spark.conf.get("spark.executor.cores").toInt

val numShufflePartitions = (numExecutors * numExecutorsCores)

spark.conf.set("spark.sql.shuffle.partitions", numShufflePartitions)

Это поможет вам установить правильное количество разделов в случайном порядке в зависимости от ядер исполнителей и исполнителей, используемых для вашего искрового задания, без ущерба для производительности и возникновения проблем с нехваткой памяти.

Если вы все еще выходите из памяти, они устанавливают свойство ниже -

spark.conf.set("spark.executor.memoryOverhead", "3G")

Другой вариант - рассчитать Dataframe размер и didvie это от hdfs размер блока и используйте полученное число для установки spark.sql.shuffle.partitions.

Вы можете использовать для этого метод df.repartition(numPartitions). Вы можете принять решение на основе ввода / промежуточного вывода и передать numPartitions методу repartition().

df.repartition(numPartitions)   or rdd.repartition(numPartitions)
Другие вопросы по тегам