Как определить разбиение DataFrame?
Я начал использовать Spark SQL и DataFrames в Spark 1.4.0. Я хочу определить пользовательский разделитель в DataFrames, в Scala, но не вижу, как это сделать.
Одна из таблиц данных, с которыми я работаю, содержит список транзакций по счетам, silimar в следующем примере.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
По крайней мере изначально, большинство расчетов будет происходить между транзакциями внутри учетной записи. Поэтому я хотел бы разделить данные так, чтобы все транзакции для учетной записи находились в одном разделе Spark.
Но я не вижу способа определить это. Класс DataFrame имеет метод repartition(Int), в котором вы можете указать количество создаваемых разделов. Но я не вижу ни одного доступного метода для определения пользовательского разделителя для DataFrame, например, который может быть указан для RDD.
Исходные данные хранятся в Parquet. Я видел, что при записи DataFrame в Parquet вы можете указать столбец для разделения, поэтому, вероятно, я мог бы сказать Parquet разделить его данные по столбцу "Account". Но может быть миллионы учетных записей, и если я правильно понимаю Parquet, это создаст отдельный каталог для каждой учетной записи, так что это не похоже на разумное решение.
Есть ли способ заставить Spark разделить этот DataFrame так, чтобы все данные для учетной записи находились в одном разделе?
5 ответов
Spark> = 2.3.0
SPARK-22614 подвергает разделению диапазона.
val partitionedByRange = df.repartitionByRange(42, $"k")
partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
// +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]
SPARK-22389 предоставляет возможность разделения на внешние форматы в Data Source API v2.
Spark >= 1.6.0
В Spark >= 1.6 можно использовать разбиение по столбцам для запросов и кеширования. См.: SPARK-11410 и SPARK-4849 с использованием repartition
метод:
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- Scan PhysicalRDD[_1#5,_2#6]
В отличие от RDDs
искра Dataset
(в том числе Dataset[Row]
ака DataFrame
) не может использовать пользовательский разделитель, как сейчас. Обычно вы можете решить эту проблему, создав искусственный столбец разделения, но он не даст вам такой же гибкости.
Искра < 1.6.0:
Одна вещь, которую вы можете сделать, это предварительно разбить входные данные перед созданием DataFrame
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner
val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))
val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))
val partitioner = new HashPartitioner(5)
val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values
val df = sqlContext.createDataFrame(partitioned, schema)
поскольку DataFrame
создание из RDD
требуется только простая фаза карты, существующий макет раздела должен быть сохранен *
assert(df.rdd.partitions == partitioned.partitions)
Так же, как вы можете перераспределить существующие DataFrame
:
sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
Так что, похоже, это не невозможно. Остается вопрос, имеет ли это смысл. Я буду утверждать, что в большинстве случаев это не так:
Перераспределение является дорогостоящим процессом. В типичном сценарии большая часть данных должна быть сериализована, перемешана и десериализована. С другой стороны, число операций, которые могут извлечь выгоду из предварительно секционированных данных, относительно невелико и дополнительно ограничено, если внутренний API не предназначен для использования этого свойства.
- включается в некоторых сценариях, но это потребует внутренней поддержки,
- вызовы оконных функций с соответствующим разделителем. То же, что и выше, ограничено определением одного окна. Это уже разделено внутри, поэтому предварительное разбиение может быть избыточным,
- простые агрегаты с
GROUP BY
- возможно уменьшить объем памяти временных буферов **, но общая стоимость намного выше. Более или менее эквивалентноgroupByKey.mapValues(_.reduce)
(текущее поведение) противreduceByKey
(Предварительно секционирование). Вряд ли будет полезным на практике. - сжатие данных с
SqlContext.cacheTable
, Так как похоже, что он использует кодирование длины пробега, применяяOrderedRDDFunctions.repartitionAndSortWithinPartitions
может улучшить степень сжатия.
Производительность сильно зависит от распределения ключей. Если это искажено, это приведет к неоптимальному использованию ресурсов. В худшем случае будет невозможно закончить работу вообще.
- Весь смысл использования высокоуровневого декларативного API состоит в том, чтобы изолировать себя от низкоуровневых деталей реализации. Как уже упоминалось Dawid Wysakowicz и Romi Kuntsman, оптимизация - это работа оптимизатора Catalyst. Это довольно искушенный зверь, и я действительно сомневаюсь, что вы можете легко улучшить это, не углубляясь в его внутренности.
Родственные концепции
Разделение на источники JDBC:
Поддержка источников данных JDBC predicates
аргумент. Может использоваться следующим образом:
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
Он создает один раздел JDBC для каждого предиката. Имейте в виду, что если наборы, созданные с использованием отдельных предикатов, не пересекаются, вы увидите дубликаты в итоговой таблице.
partitionBy
метод вDataFrameWriter
:
искра DataFrameWriter
обеспечивает partitionBy
метод, который можно использовать для "разделения" данных при записи. Он разделяет данные о записи, используя предоставленный набор столбцов
val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")
df.write.partitionBy("k").json("/tmp/foo.json")
Это позволяет предикату нажать на чтение для запросов на основе ключа:
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")
но это не эквивалентно DataFrame.repartition
, В частности, агрегаты, такие как:
val cnts = df1.groupBy($"k").sum()
все еще потребует TungstenExchange
:
cnts.explain
// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBy
метод вDataFrameWriter
(Spark >= 2.0):
bucketBy
имеет аналогичные приложения, как partitionBy
но доступно только для таблиц (saveAsTable
). Информация о контейнерах может использоваться для оптимизации соединений:
// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")
// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
// :- *Sort [k#41 ASC NULLS FIRST], false, 0
// : +- *Project [k#41, v#42]
// : +- *Filter isnotnull(k#41)
// : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
// +- *Sort [k#46 ASC NULLS FIRST], false, 0
// +- *Project [k#46, v2#47]
// +- *Filter isnotnull(k#46)
// +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
* Под разметкой разделов я подразумеваю только распределение данных. partitioned
СДР больше не имеет разделителя.
** При условии, что нет раннего прогноза. Если агрегация охватывает только небольшое подмножество столбцов, вероятно, никакого выигрыша не будет.
В Spark < 1.6 Если вы создаете HiveContext
не старая добрая SqlContext
Вы можете использовать HiveQL DISTRIBUTE BY colX...
(гарантирует, что каждый из N редукторов получит непересекающиеся диапазоны x) & CLUSTER BY colX...
(ярлык для Распределить по и Сортировать по), например;
df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
Не уверен, как это вписывается в API Spark DF. Эти ключевые слова не поддерживаются в обычном SqlContext (обратите внимание, что вам не нужно иметь мета-хранилище улья для использования HiveContext)
РЕДАКТИРОВАТЬ: Spark 1.6+ теперь имеет это в родном API DataFrame
Итак, начать с какого-то ответа:) - Вы не можете
Я не эксперт, но насколько я понимаю DataFrames, они не равны rdd и DataFrame не имеет такого понятия, как Partitioner.
Вообще идея DataFrame - предоставить другой уровень абстракции, который сам решает такие проблемы. Запросы в DataFrame преобразуются в логический план, который затем переводится в операции с RDD. Разделение, которое вы предложили, вероятно, будет применено автоматически или, по крайней мере, должно быть.
Если вы не доверяете SparkSQL в том, что он обеспечит какую-то оптимальную работу, вы всегда можете преобразовать DataFrame в RDD[Row], как указано в комментариях.
Я смог сделать это с помощью RDD. Но я не знаю, является ли это приемлемым решением для вас. Когда у вас есть DF, доступный как RDD, вы можете подать заявку repartitionAndSortWithinPartitions
выполнить пользовательское перераспределение данных.
Вот образец, который я использовал:
class DatePartitioner(partitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
val start_time: Long = key.asInstanceOf[Long]
Objects.hash(Array(start_time)) % partitions
}
override def numPartitions: Int = partitions
}
myRDD
.repartitionAndSortWithinPartitions(new DatePartitioner(24))
.map { v => v._2 }
.toDF()
.write.mode(SaveMode.Overwrite)
Используйте DataFrame, возвращенный:
yourDF.orderBy(account)
Не существует явного способа использовать partitionBy в DataFrame, только в PairRDD, но когда вы сортируете DataFrame, он будет использовать его в своем LogicalPlan, и это поможет, когда вам нужно будет выполнить вычисления для каждой учетной записи.
Я просто наткнулся на ту же самую проблему с кадром данных, который я хочу разделить по аккаунту. Я предполагаю, что когда вы говорите "хотите разделить данные так, чтобы все транзакции для учетной записи находились в одном и том же разделе Spark", вам нужен масштаб и производительность, но ваш код от этого не зависит (например, использование mapPartitions() и т. д.), верно?