Как динамически выбирать spark.sql.shuffle.partitions
В настоящее время я обрабатываю данные с помощью раздела spark и foreach, открываю соединение с mysql и вставляю его в базу данных в количестве 1000. Как указано в значении по умолчанию SparkDocumentation: spark.sql.shuffle.partitions
200, но я хочу, чтобы он был динамичным. Итак, как мне рассчитать это. Следовательно, ни выбор очень высокого значения, вызывающего снижение производительности, ни выбор очень малого значения, вызывающего OOM
,
1 ответ
Попробуйте вариант ниже -
val numExecutors = spark.conf.get("spark.executor.instances").toInt
val numExecutorsCores = spark.conf.get("spark.executor.cores").toInt
val numShufflePartitions = (numExecutors * numExecutorsCores)
spark.conf.set("spark.sql.shuffle.partitions", numShufflePartitions)
Это поможет вам установить правильное количество разделов в случайном порядке в зависимости от ядер исполнителей и исполнителей, используемых для вашего искрового задания, без ущерба для производительности и возникновения проблем с нехваткой памяти.
Если вы все еще выходите из памяти, они устанавливают свойство ниже -
spark.conf.set("spark.executor.memoryOverhead", "3G")
Другой вариант - рассчитать Dataframe
размер и didvie
это от hdfs
размер блока и используйте полученное число для установки spark.sql.shuffle.partitions
.
Вы можете использовать для этого метод df.repartition(numPartitions). Вы можете принять решение на основе ввода / промежуточного вывода и передать numPartitions методу repartition().
df.repartition(numPartitions) or rdd.repartition(numPartitions)