Spark UDAF с ArrayType в качестве проблем с производительностью bufferSchema
Я работаю над UDAF, который возвращает массив элементов.
Входными данными для каждого обновления является кортеж индекса и значения.
UDAF суммирует все значения по одному и тому же индексу.
Пример:
Для ввода (индекс, значение): (2,1), (3,1), (2,3)
должен вернуть (0,0,4,1,...,0)
Логика работает нормально, но у меня есть проблема с методом обновления, моя реализация обновляет только 1 ячейку для каждой строки, но последнее назначение в этом методе фактически копирует весь массив - что является избыточным и чрезвычайно трудоемким.
Одно только это назначение отвечает за 98% времени выполнения моего запроса.
У меня вопрос, как я могу сократить это время? Можно ли назначить 1 значение в массиве буферов без необходимости замены всего буфера?
PS: я работаю с Spark 1.6 и не могу обновить его в ближайшее время, поэтому, пожалуйста, придерживайтесь решения, которое будет работать с этой версией.
class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{
val bucketSize = 1000
def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)
def dataType: DataType = ArrayType(LongType)
def deterministic: Boolean = true
def bufferSchema: StructType = {
StructType(
StructField("buckets", ArrayType(LongType)) :: Nil
)
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new Array[Long](bucketSize)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val index = input.getLong(0)
val value = input.getLong(1)
val arr = buffer.getAs[mutable.WrappedArray[Long]](0)
buffer(0) = arr // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for(i <- arr1.indices){
arr1.update(i, arr1(i) + arr2(i))
}
buffer1(0) = arr1
}
override def evaluate(buffer: Row): Any = {
buffer.getAs[mutable.WrappedArray[Long]](0)
}
}
1 ответ
TL;DR Либо не используйте UDAF, либо используйте примитивные типы вместо ArrayType
,
Без UserDefinedFunction
Оба решения должны пропустить дорогостоящее жонглирование между внутренним и внешним представлением.
Используя стандартные агрегаты и pivot
Это использует стандартные агрегации SQL. Несмотря на внутреннюю оптимизацию, это может быть дорого, если количество ключей и размер массива растут.
Учитывая вход:
val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")
Вы можете:
import org.apache.spark.sql.functions.{array, coalesce, col, lit}
val nBuckets = 10
@transient val values = array(
0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)
df
.groupBy("id")
.pivot("index", 0 until nBuckets)
.sum("value")
.select($"id", values.alias("values"))
+---+--------------------+
| id| values|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
Использование RDD API с combineByKey
/ aggregateByKey
,
Обычная старая byKey
агрегация с изменяемым буфером. Никаких наворотов, но должен работать достаточно хорошо с широким диапазоном входов. Если вы подозреваете, что входные данные редки, вы можете рассмотреть более эффективное промежуточное представление, например, изменяемое Map
,
rdd
.aggregateByKey(Array.fill(nBuckets)(0L))(
{ case (acc, (index, value)) => { acc(index) += value; acc }},
(acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
).toDF
+---+--------------------+
| _1| _2|
+---+--------------------+
| 1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+
С помощью UserDefinedFunction
с примитивными типами
Насколько я понимаю внутренности, узкое место производительности ArrayConverter.toCatalystImpl
,
Похоже, он вызывается для каждого звонка MutableAggregationBuffer.update
и, в свою очередь, выделяет новые GenericArrayData
для каждого Row
,
Если мы переопределим bufferSchema
как:
def bufferSchema: StructType = {
StructType(
0 to nBuckets map (i => StructField(s"x$i", LongType))
)
}
и то и другое update
а также merge
может быть выражено как простые замены примитивных значений в буфере. Цепочка вызовов останется довольно длинной, но она не потребует копий / преобразований и сумасшедших распределений. Опуская null
проверяет, что вам нужно что-то похожее на
val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))
а также
for(i <- 0 to nBuckets){
buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}
соответственно.
в заключение evaluate
должен взять Row
и преобразовать его в вывод Seq
:
for (i <- 0 to nBuckets) yield buffer.getLong(i)
Обратите внимание, что в этой реализации возможное узкое место merge
, Хотя это не должно создавать каких-либо новых проблем с производительностью, с M блоками, каждый вызов merge
это O (M).
С K уникальными ключами и P разделами он будет вызываться M * K раз в худшем случае, когда каждый ключ встречается по крайней мере один раз на каждом разделе. Это эффективно увеличивает соучастие merge
компонент к О (М * Н * К).
В общем, вы ничего не можете с этим поделать. Однако, если вы сделаете определенные предположения о распределении данных (данные разрежены, распределение ключей равномерно), вы можете немного укорочить вещи и сначала перемешать:
df
.repartition(n, $"key")
.groupBy($"key")
.agg(SumArrayAtIndexUDAF($"index", $"value"))
Если предположения будут выполнены, он должен:
- Противоположным образом уменьшите размер перемешивания, перетасовывая разреженные пары вместо плотных массивов
Rows
, - Агрегируйте данные, используя только обновления (каждый O (1)), возможно, касаясь только подмножества индексов.
Однако, если одно или оба предположения не будут удовлетворены, вы можете ожидать, что размер перемешивания увеличится, а количество обновлений останется прежним. В то же время перекосы данных могут сделать вещи еще хуже, чем в update
- shuffle
- merge
сценарий.
С помощью Aggregator
с "сильно" набрал Dataset
:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int) extends Aggregator[I, Array[Long], Seq[Long]]
with Serializable {
def zero = Array.fill(bucketSize)(0L)
def reduce(acc: Array[Long], x: I) = {
val (i, v) = f(x)
acc(i) += v
acc
}
def merge(acc1: Array[Long], acc2: Array[Long]) = {
for {
i <- 0 until bucketSize
} acc1(i) += acc2(i)
acc1
}
def finish(acc: Array[Long]) = acc.toSeq
def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}
который может быть использован, как показано ниже
val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS
ds
.groupByKey(_._1)
.agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
.show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2) |
+-----+-------------------------------+
|1 |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2 |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+