Когда происходит слияние в пользовательских определяющих функциях UDAF в Spark?

Я хочу знать, при каких обстоятельствах Spark будет выполнять слияние как часть функции UDAF.

Мотивация: я использую множество функций UDAF над окном в моем проекте Spark. Часто я хочу ответить на такой вопрос:

Сколько раз транзакция по кредитной карте была сделана в той же стране, что и текущая транзакция в окне 30 дней?

Окно начнется с текущей транзакции, но не будет включено в счетчик. Требуется значение текущей транзакции, чтобы узнать, какую страну рассчитывать за последние 30 дней.

val rollingWindow = Window
      .partitionBy(partitionByColumn)
      .orderBy(orderByColumn.desc)
      .rangeBetween(0, windowSize)

df.withColumn(
  outputColumnName,
  customUDAF(inputColumn, orderByColumn).over(rollingWindow))

Я написал свой customUDAF для подсчета. Я всегда использую .orderBy(orderByColumn.desc) и благодаря .desc текущая транзакция отображается первой в окне во время расчета.

Функции UDAF требуют реализации merge функция, которая объединяет два промежуточных буфера агрегации в параллельных вычислениях. Если произойдет слияние, мой current transaction может не совпадать для разных буферов и результаты UDAF будут неверными.

Я написал функцию UDAF, которая подсчитывает количество слияний в моем наборе данных и сохраняет только первую транзакцию в окне для сравнения с текущей транзакцией.

 class FirstUDAF() extends UserDefinedAggregateFunction {

  def inputSchema = new StructType().add("x", StringType)
    .add("y", StringType)

  def bufferSchema = new StructType()
    .add("first", StringType)
    .add("numMerge", IntegerType)

  def dataType = new StructType()
    .add("firstCode", StringType)
    .add("numMerge", IntegerType)

  def deterministic = true

  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = ""
    buffer(1) = 1
  }

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (buffer.getString(0) == "")
      buffer(0) = input.getString(0)

  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
  }

  def evaluate(buffer: Row) = buffer
}

Когда я запускаю его с помощью spark 2.0.1 на локальном мастере с 16 процессорами, слияний никогда не бывает, и первая транзакция в окне всегда является текущей транзакцией. Это то, что я хочу. В ближайшем будущем я буду запускать свой код в наборе данных большего размера в x100 и в реальном распределенном кластере Spark и хочу знать, могут ли там произойти слияния.

Вопросы:

  • При каких обстоятельствах / условиях слияния происходят в UDAF?
  • Есть ли у Windows порядок с слияниями?
  • Можно ли сказать Spark не делать слияния?

1 ответ

Решение

При каких обстоятельствах / условиях слияния происходят в UDAF?

merge вызывается, когда частичные приложения функции агрегирования ("агрегация на стороне карты") объединяются после тасования ("уменьшение агрегации на стороне").

Есть ли у Windows порядок с слияниями?

В текущей реализации никогда. На данный момент оконные функции просто причудливы groupByKeyи нет частичной агрегации. Это, конечно, детали реализации и могут быть изменены без предварительного уведомления в будущем.

Можно ли сказать Spark не делать слияния?

Это не. Однако, если данные уже разделены по ключу агрегирования, нет необходимости merge и только combine используется.

В заключение:

Сколько раз транзакция по кредитной карте была сделана в той же стране, что и текущая транзакция в окне 30 дней?

не требует UDAFs или оконные функции. Я бы, вероятно, создал бы падающие окна с o.a.s.sql.functions.window, агрегировать по пользователю, стране и окну и присоединиться обратно с вводом.

Другие вопросы по тегам