Оптимизация соединения DataFrame - Broadcast Hash Join
Я пытаюсь эффективно объединить два DataFrames, один из которых большой, а второй немного меньше.
Есть ли способ избежать всего этого перетасовки? Я не могу установить autoBroadCastJoinThreshold
потому что он поддерживает только целые числа - и таблица, которую я пытаюсь транслировать, немного больше, чем целое число байтов.
Есть ли способ заставить трансляцию игнорировать эту переменную?
6 ответов
Широковещательные хеш-соединения (аналогично объединению на стороне карты или объединению на стороне карты в Mapreduce):
В SparkSQL вы можете увидеть тип соединения, которое выполняется queryExecution.executedPlan
, Как и в случае с ядром Spark, если одна из таблиц намного меньше другой, вам может потребоваться широковещательное хеш-соединение. Вы можете намекнуть Spark SQL, что данный DF должен передаваться для соединения, вызывая метод broadcast
на DataFrame
прежде чем присоединиться к нему
Пример: largedataframe.join(broadcast(smalldataframe), "key")
в терминах DWH, где largedataframe может быть как факт
smalldataframe может быть как измерение
Как описано в моей любимой книге (HPS), пожалуйста. см. ниже, чтобы иметь лучшее понимание..
Примечание: выше broadcast
из import org.apache.spark.sql.functions.broadcast
не из SparkContext
Spark также автоматически использует spark.sql.conf.autoBroadcastJoinThreshold
определить, следует ли транслировать таблицу.
Совет: см. Метод DataFrame.explain()
def
explain(): Unit
Prints the physical plan to the console for debugging purposes.
Есть ли способ заставить трансляцию игнорировать эту переменную?
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
НОТА:
Еще одно аналогичное примечание в отношении улья (не искры): аналогичная вещь может быть достигнута с помощью подсказки улья
MAPJOIN
как ниже...
Select /*+ MAPJOIN(b) */ a.key, a.value from a join b on a.key = b.key
hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask.size=20971520
hive> set hive.auto.convert.join.noconditionaltask=true;
hive> set hive.auto.convert.join.use.nonstaged=true;
hive> set hive.mapjoin.smalltable.filesize = 30000000; // default 25 mb made it as 30mb
Вы можете намекнуть на передачу данных с помощью left.join(broadcast(right), ...)
Настройка spark.sql.autoBroadcastJoinThreshold = -1
полностью отключит трансляцию. См. Другие параметры конфигурации в Spark SQL, DataFrames и Datasets Guide.
Это текущее ограничение искры, см. SPARK-6235. Ограничение в 2 ГБ также применяется для широковещательных переменных.
Вы уверены, что нет другого хорошего способа сделать это, например, другое разбиение?
В противном случае вы можете обойти это, вручную создав несколько широковещательных переменных, каждая из которых <2 ГБ.
Я обнаружил, что этот код работает для Broadcast Join в Spark 2.11 версии 2.0.0.
import org.apache.spark.sql.functions.broadcast
val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF
// materializing the department data
val tmpDepartments = broadcast(departmentsDF.as("departments"))
import context.implicits._
employeesDF.join(broadcast(tmpDepartments),
$"depId" === $"id", // join by employees.depID == departments.id
"inner").show()
Вот ссылка на приведенный выше код. Хеннинг Кропп Блог, трансляция Присоединиться к Spark
Использование подсказок соединения будет иметь приоритет над конфигурацией.
autoBroadCastJoinThreshold
, поэтому использование подсказки всегда игнорирует этот порог.
Кроме того, при использовании подсказки соединения адаптивное выполнение запроса (начиная с Spark 3.x) также не изменит стратегию, указанную в подсказке.
В Spark SQL вы можете применять подсказки соединения, как показано ниже:
SELECT /*+ BROADCAST */ a.id, a.value FROM a JOIN b ON a.id = b.id
SELECT /*+ BROADCASTJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id
SELECT /*+ MAPJOIN */ a.id, a.value FROM a JOIN b ON a.id = b.id
Обратите внимание, что ключевые слова BROADCAST, BROADCASTJOIN и MAPJOIN одинаковы.