Spark UDAF с ArrayType в качестве проблем с производительностью bufferSchema

Я работаю над UDAF, который возвращает массив элементов.

Входными данными для каждого обновления является кортеж индекса и значения.

UDAF суммирует все значения по одному и тому же индексу.

Пример:

Для ввода (индекс, значение): (2,1), (3,1), (2,3)

должен вернуть (0,0,4,1,...,0)

Логика работает нормально, но у меня есть проблема с методом обновления, моя реализация обновляет только 1 ячейку для каждой строки, но последнее назначение в этом методе фактически копирует весь массив - что является избыточным и чрезвычайно трудоемким.

Одно только это назначение отвечает за 98% времени выполнения моего запроса.

У меня вопрос, как я могу сократить это время? Можно ли назначить 1 значение в массиве буферов без необходимости замены всего буфера?

PS: я работаю с Spark 1.6 и не могу обновить его в ближайшее время, поэтому, пожалуйста, придерживайтесь решения, которое будет работать с этой версией.

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices){
      arr1.update(i, arr1(i) + arr2(i))
    }

    buffer1(0) = arr1
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[mutable.WrappedArray[Long]](0)
  }
}

1 ответ

Решение

TL;DR Либо не используйте UDAF, либо используйте примитивные типы вместо ArrayType,

Без UserDefinedFunction

Оба решения должны пропустить дорогостоящее жонглирование между внутренним и внешним представлением.

Используя стандартные агрегаты и pivot

Это использует стандартные агрегации SQL. Несмотря на внутреннюю оптимизацию, это может быть дорого, если количество ключей и размер массива растут.

Учитывая вход:

val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")

Вы можете:

import org.apache.spark.sql.functions.{array, coalesce, col, lit}

val nBuckets = 10
@transient val values = array(
  0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
)

df
  .groupBy("id")
  .pivot("index", 0 until nBuckets)
  .sum("value")
  .select($"id", values.alias("values"))
+---+--------------------+                                                      
| id|              values|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

Использование RDD API с combineByKey / aggregateByKey ,

Обычная старая byKey агрегация с изменяемым буфером. Никаких наворотов, но должен работать достаточно хорошо с широким диапазоном входов. Если вы подозреваете, что входные данные редки, вы можете рассмотреть более эффективное промежуточное представление, например, изменяемое Map,

rdd
  .aggregateByKey(Array.fill(nBuckets)(0L))(
    { case (acc, (index, value)) => { acc(index) += value; acc }},
    (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
  ).toDF
+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  1|[0, 0, 4, 1, 0, 0...|
+---+--------------------+

С помощью UserDefinedFunction с примитивными типами

Насколько я понимаю внутренности, узкое место производительности ArrayConverter.toCatalystImpl,

Похоже, он вызывается для каждого звонка MutableAggregationBuffer.update и, в свою очередь, выделяет новые GenericArrayData для каждого Row,

Если мы переопределим bufferSchema как:

def bufferSchema: StructType = {
  StructType(
    0 to nBuckets map (i => StructField(s"x$i", LongType))
  )
}

и то и другое update а также merge может быть выражено как простые замены примитивных значений в буфере. Цепочка вызовов останется довольно длинной, но она не потребует копий / преобразований и сумасшедших распределений. Опуская null проверяет, что вам нужно что-то похожее на

val index = input.getLong(0)
buffer.update(index, buffer.getLong(index) + input.getLong(1))

а также

for(i <- 0 to nBuckets){
  buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
}

соответственно.

в заключение evaluate должен взять Row и преобразовать его в вывод Seq:

 for (i <- 0 to nBuckets)  yield buffer.getLong(i)

Обратите внимание, что в этой реализации возможное узкое место merge, Хотя это не должно создавать каких-либо новых проблем с производительностью, с M блоками, каждый вызов merge это O (M).

С K уникальными ключами и P разделами он будет вызываться M * K раз в худшем случае, когда каждый ключ встречается по крайней мере один раз на каждом разделе. Это эффективно увеличивает соучастие merge компонент к О (М * Н * К).

В общем, вы ничего не можете с этим поделать. Однако, если вы сделаете определенные предположения о распределении данных (данные разрежены, распределение ключей равномерно), вы можете немного укорочить вещи и сначала перемешать:

df
  .repartition(n, $"key")
  .groupBy($"key")
  .agg(SumArrayAtIndexUDAF($"index", $"value"))

Если предположения будут выполнены, он должен:

  • Противоположным образом уменьшите размер перемешивания, перетасовывая разреженные пары вместо плотных массивов Rows,
  • Агрегируйте данные, используя только обновления (каждый O (1)), возможно, касаясь только подмножества индексов.

Однако, если одно или оба предположения не будут удовлетворены, вы можете ожидать, что размер перемешивания увеличится, а количество обновлений останется прежним. В то же время перекосы данных могут сделать вещи еще хуже, чем в update - shuffle - merge сценарий.

С помощью Aggregator с "сильно" набрал Dataset:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}

class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int)  extends Aggregator[I, Array[Long], Seq[Long]]
    with Serializable {
  def zero = Array.fill(bucketSize)(0L)
  def reduce(acc: Array[Long], x: I) = {
    val (i, v) = f(x)
    acc(i) += v
    acc
  }

  def merge(acc1: Array[Long], acc2: Array[Long]) = {
    for {
      i <- 0 until bucketSize
    } acc1(i) += acc2(i)
    acc1
  }

  def finish(acc: Array[Long]) = acc.toSeq

  def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
  def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
}

который может быть использован, как показано ниже

val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS

ds
  .groupByKey(_._1)
  .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
  .show(false)
+-----+-------------------------------+
|value|SumArrayAtIndex(scala.Tuple2)  |
+-----+-------------------------------+
|1    |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
|2    |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
+-----+-------------------------------+
Другие вопросы по тегам