Можем ли мы использовать CBO Spark (Оптимизатор затрат) с собственным паркетом или в фрейме данных в памяти?

Скажем, я хочу объединить 3 таблицы A,B,C с внутренним объединением, а C очень маленьким.

#DUMMY EXAMPLE with IN-MEMORY table, but same issue if load table using spark.read.parquet("")
var A = (1 to 1000000).toSeq.toDF("A")
var B = (1 to 1000000).toSeq.toDF("B")
var C = (1 to 10).toSeq.toDF("C")

И я не имею никакого контроля над тем, какой порядок мне выдается:

CASE1 = A.join(B,expr("A=B"),"inner").join(C,expr("A=C"),"inner")
CASE2 = A.join(C,expr("A=C"),"inner").join(B,expr("A=B"),"inner")

Запуск обоих показывает, что CASE1 работает на 30-40% медленнее, чем CASE2.

Таким образом, вопрос заключается в следующем: как использовать CBO Spark для автоматического перевода CASE1 в CASE2 для таблицы в памяти или таблицы, загруженной из программы чтения паркета Spark?

Я пытался сделать:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.cbo.enabled", "true")
A.createOrReplaceTempView("A")
spark.sql("ANALYZE TABLE A COMPUTE STATISTICS")

но это бросает

org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'a' not found in database 'default'

Есть ли другой способ активировать CBO без необходимости сохранять таблицу в Hive?


Приложение:

  1. Даже при использовании spark.conf.set("spark.sql.cbo.enabled", "true") в SparkWebUI не отображается оценка стоимости.
  2. Показ CASE1.explain! = CASE2.explain

CASE1.explain

== Physical Plan ==
*(5) SortMergeJoin [A#3], [C#13], Inner
:- *(3) SortMergeJoin [A#3], [B#8], Inner
:  :- *(1) Sort [A#3 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(A#3, 200)
:  :     +- LocalTableScan [A#3]
:  +- *(2) Sort [B#8 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(B#8, 200)
:        +- LocalTableScan [B#8]
+- *(4) Sort [C#13 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(C#13, 200)
      +- LocalTableScan [C#13]

CASE2.explain

== Physical Plan ==
*(5) SortMergeJoin [A#3], [B#8], Inner
:- *(3) SortMergeJoin [A#3], [C#13], Inner
:  :- *(1) Sort [A#3 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(A#3, 200)
:  :     +- LocalTableScan [A#3]
:  +- *(2) Sort [C#13 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(C#13, 200)
:        +- LocalTableScan [C#13]
+- *(4) Sort [B#8 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(B#8, 200)
      +- LocalTableScan [B#8]

1 ответ

Решение

Нет, короткий ответ, что это невозможно.

Этот https://databricks.com/blog/2017/08/31/cost-based-optimizer-in-apache-spark-2-2.html предоставляет превосходный обзор того, что возможно, и точки зрения на постоянные хранилища данных.

Другие вопросы по тегам