Как предотвратить Spark оптимизацию
Иногда Spark "оптимизирует" фрейм данных неэффективным способом. Рассмотрим следующий пример в Spark 2.1 (также может быть воспроизведен в Spark 1.6):
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
df_result
.coalesce(1)
.saveAsTable(tablename)
В этом примере я хочу написать 1 файл после дорогостоящей трансформации кадра данных (это всего лишь пример, демонстрирующий проблему). Искра движется coalesce(1)
до такой степени, что UDF применяется только к кадру данных, содержащему 1 раздел, тем самым разрушая параллелизм (интересно repartition(1)
не ведет себя таким образом).
В целом, такое поведение возникает, когда я хочу увеличить параллелизм в определенной части моего преобразования, но затем уменьшить параллелизм.
Я нашел один обходной путь, который состоит в кэшировании фрейма данных и последующем запуске полной оценки фрейма данных:
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache
df_result.rdd.count // trigger computation
df_result
.coalesce(1)
.saveAsTable(tablename)
Мой вопрос: есть ли другой способ сказать Спарку, чтобы он не менял положение определенной трансформации?
1 ответ
На самом деле это не из-за оптимизации SparkSQL, SparkSQL не меняет положение оператора Coalesce, как показывает выполненный план:
Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
+- *SerializeFromObject [input[0, double, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
Я цитирую абзац из описания coalesce API:
Примечание: этот абзац добавлен jira SPARK-19399. Так что это не должно быть найдено в API 2.0.
Однако, если вы делаете резкое объединение, например, с numPartitions = 1, это может привести к тому, что ваши вычисления будут выполняться на меньшем количестве узлов, чем вам нравится (например, один узел в случае numPartitions = 1). Чтобы избежать этого, вы можете позвонить передел. Это добавит шаг в случайном порядке, но означает, что текущие разделы восходящего потока будут выполняться параллельно (независимо от текущего разделения).
API coalesce не выполняет тасование, но приводит к узкой зависимости между предыдущим RDD и текущим RDD. Поскольку СДР является ленивой оценкой, вычисления фактически выполняются с объединенными разделами.
Чтобы предотвратить это, вы должны использовать API перераспределения.