Как предотвратить Spark оптимизацию

Иногда Spark "оптимизирует" фрейм данных неэффективным способом. Рассмотрим следующий пример в Spark 2.1 (также может быть воспроизведен в Spark 1.6):

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))

df_result
.coalesce(1)
.saveAsTable(tablename)

В этом примере я хочу написать 1 файл после дорогостоящей трансформации кадра данных (это всего лишь пример, демонстрирующий проблему). Искра движется coalesce(1) до такой степени, что UDF применяется только к кадру данных, содержащему 1 раздел, тем самым разрушая параллелизм (интересно repartition(1) не ведет себя таким образом).

В целом, такое поведение возникает, когда я хочу увеличить параллелизм в определенной части моего преобразования, но затем уменьшить параллелизм.

Я нашел один обходной путь, который состоит в кэшировании фрейма данных и последующем запуске полной оценки фрейма данных:

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache

df_result.rdd.count // trigger computation

df_result
.coalesce(1)
.saveAsTable(tablename)

Мой вопрос: есть ли другой способ сказать Спарку, чтобы он не менял положение определенной трансформации?

1 ответ

Решение

На самом деле это не из-за оптимизации SparkSQL, SparkSQL не меняет положение оператора Coalesce, как показывает выполненный план:

Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
   +- *SerializeFromObject [input[0, double, false] AS value#2]
      +- Scan ExternalRDDScan[obj#1]

Я цитирую абзац из описания coalesce API:

Примечание: этот абзац добавлен jira SPARK-19399. Так что это не должно быть найдено в API 2.0.

Однако, если вы делаете резкое объединение, например, с numPartitions = 1, это может привести к тому, что ваши вычисления будут выполняться на меньшем количестве узлов, чем вам нравится (например, один узел в случае numPartitions = 1). Чтобы избежать этого, вы можете позвонить передел. Это добавит шаг в случайном порядке, но означает, что текущие разделы восходящего потока будут выполняться параллельно (независимо от текущего разделения).

API coalesce не выполняет тасование, но приводит к узкой зависимости между предыдущим RDD и текущим RDD. Поскольку СДР является ленивой оценкой, вычисления фактически выполняются с объединенными разделами.

Чтобы предотвратить это, вы должны использовать API перераспределения.

Другие вопросы по тегам