Spark Bucketing производительность чтения
Версия Spark - 2.2.1.
Я создал таблицу с 64 сегментами, я выполняю функцию агрегирования select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa
, Я вижу, что 64 задачи в пользовательском интерфейсе Spark, которые используют только 4 исполнителя (у каждого исполнителя по 16 ядер) из 20. Есть ли способ, с помощью которого я могу масштабировать количество задач или таким образом следует выполнять запросы с группировкой (количество работающих ядер? как количество ведер)?
Вот таблица создания:
sql("""CREATE TABLE level_1 (
bundle string,
date_ date,
hour SMALLINT)
USING ORC
PARTITIONED BY (date_ , hour )
CLUSTERED BY (ifa)
SORTED BY (ifa)
INTO 64 BUCKETS
LOCATION 'XXX'""")
Вот запрос:
sql(s"select t1.ifa,count(*) from $tblName t1 where t1.date_ = '2018-01-01' group by ifa").show
2 ответа
С бэкетингом количество задач == количество сегментов, поэтому вы должны знать количество ядер / задач, которые вы хотите / хотите использовать, а затем установить его в качестве номера сегментов.
Количество задач = количество сегментов, вероятно, является наиболее важным и мало обсуждаемым аспектом сегментирования в Spark. Сегменты (по умолчанию) исторически использовались исключительно для создания «предварительно перемешанных» фреймов данных, которые могут оптимизировать большие объединения. Когда вы читаете таблицу с разделением на сегменты, все файлы или файлы для каждого сегмента читаются одним искровым исполнителем (30 сегментов = 30 искровых задач при чтении данных), что позволяет соединить таблицу с другой таблицей, разделенной на тот же # колонн. Я считаю такое поведение раздражающим и, как и упомянутый выше пользователь, проблематичным для таблиц, которые могут расти.
You might be asking yourself now, why and when in the would I ever want to bucket and when will my real-world data grow exactly in the same way over time? (you probably partitioned your big data by date, be honest) In my experience you probably don't have a great use case to bucket tables in the default spark way. BUT ALL IS NOT LOST FOR BUCKETING!
Введите «ведро-обрезку». Отсечение ведра работает только тогда, когда вы объединяете ОДИН столбец, но потенциально является вашим лучшим другом в Spark с момента появления SparkSQL и Dataframes. Это позволяет Spark определять, какие файлы в вашей таблице содержат определенные значения на основе некоторого фильтра в вашем запросе, что может Существенно уменьшить количество файлов, которые искра физически читает, что приводит к чрезвычайно эффективным и быстрым запросам.(Я взял 2+ часовых запросов до 2 минут и 1/100 рабочих Spark). Но вам, вероятно, все равно, потому что количество сегментов для задач означает, что ваша таблица никогда не будет «увеличиваться в масштабе», если у вас слишком много файлов в корзине для каждого раздела.
Войдите в Spark 3.2.0. Появляется новая функция, которая позволит обрезке ведра оставаться активной, когда вы отключите чтение на основе ведра, что позволит вам распределять искровые чтения с отсечкой / сканированием ведра. У меня также есть трюк для этого с spark <3.2 следующим образом.(обратите внимание, что листовое сканирование файлов с vanilla spark.read на s3 добавляется накладных расходов, но если ваша таблица большая, это не имеет значения, так как ваша оптимизированная для ведра таблица будет распределенным чтением среди всех ваших доступных искровых рабочих и теперь будет масштабируемый)
val table = "ex_db.ex_tbl"
val target_partition = "2021-01-01"
val bucket_target = "valuex"
val bucket_col = "bucket_col"
val partition_col = "date"
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.{FileScanRDD,FilePartition}
val df = spark.table(tablename).where((col(partition_col)===lit(target_partition)) && (col(bucket_col)===lit(bucket_target)))
val sparkplan = df.queryExecution.executedPlan
val scan = sparkplan.collectFirst { case exec: FileSourceScanExec => exec }.get
val rdd = scan.inputRDDs.head.asInstanceOf[FileScanRDD]
val bucket_files = for
{ FilePartition(bucketId, files) <- rdd.filePartitions f <- files }
yield s"$f".replaceAll("path: ", "").split(",")(0)
val format = bucket_files(0).split("
.").last
val result_df = spark.read.option("mergeSchema", "False").format(format).load(bucket_files:_*).where(col(bucket_col) === lit(bucket_target))