Spark группировки и пользовательские агрегации

У меня есть данные, как показано ниже,

n1  d1  un1 mt1 1
n1  d1  un1 mt2 2
n1  d1  un1 mt3 3
n1  d1  un1 mt4 4
n1  d2  un1 mt1 3
n1  d2  un1 mt3 3
n1  d2  un1 mt4 4
n1  d2  un1 mt5 6
n1  d2  un1 mt2 3

я хочу получить вывод, как показано ниже

n1 d1 un1 0.75
n1 d2 un1 1.5

я делаю группирование по 1-му, 2-му и 3-му столбцам и для 4-го столбца следуйте приведенной ниже формуле,4-й столбец = внутри группы, (mt1+mt2)/mt4

Я пытаюсь сделать то же самое с spark DF, предполагая, что данные находятся в фрейме данных с именем столбца как n, d, un, mt, r, я пытаюсь это сделать.

sqlContext.udf.register("aggUDF",(v:List(mt,r))=> ?)
val b = a.groupBy("n","d","un").agg(callUdf("aggUDF",List((mt,r)) should go here))

2 ответа

Если я правильно понимаю, вы сначала хотите вычислить сумму строк с mt1 и mt2 и разделить на сумму строк в mt4 для каждого отдельного n1,d1, un1.

Хотя можно работать с пользовательскими функциями агрегирования, как указано выше, вы также можете использовать небольшую грубую силу (я покажу это в pyspark, но вы сможете легко конвертировать в scala).

Предположим, ваш исходный фрейм данных называется df, а столбцы расположены в следующем порядке: n,d,un,mt,r.

Сначала создайте новый столбец для каждого из mt1, mt2 и mt4 следующим образом:

from pyspark.sql import functions as F
newdf = df.withColumn("mt1", when(df.mt == "mt1", df.r).otherwise(0).alias("mt1"))
newdf = newdf .withColumn("mt2", when(newdf.mt == "mt2", newdf .r).otherwise(0).alias("mt2"))
newdf = newdf .withColumn("mt4", when(newdf.mt == "mt4", newdf .r).otherwise(0).alias("mt4"))

Теперь выполните группирование по первым 3 значениям, а в качестве агрегации - сумму новых 3 значений.

aggregated = newdf.groupBy(["n","d","n"]).agg(F.sum(newdf.mt1).alias("sum_mt1"),F.sum(newdf.mt2).alias("sum_mt2"), F.sum(newdf.mt4).alias("sum_mt4"))

Теперь просто сделайте расчет:

final = aggregated.withColumn("res", (aggregated.sum_mt1 +  aggregated.sum_mt2) / aggregated.sum_mt4)    

Не самое элегантное решение, но оно может работать на вас...

В настоящее время (Spark 1.4) нет поддержки пользовательских функций агрегирования. Однако вы можете использовать Hive UDAF. Вы можете увидеть пример пользовательской функции агрегирования Hive (UDAF) в Spark здесь.

Другие вопросы по тегам