Spark группировки и пользовательские агрегации
У меня есть данные, как показано ниже,
n1 d1 un1 mt1 1
n1 d1 un1 mt2 2
n1 d1 un1 mt3 3
n1 d1 un1 mt4 4
n1 d2 un1 mt1 3
n1 d2 un1 mt3 3
n1 d2 un1 mt4 4
n1 d2 un1 mt5 6
n1 d2 un1 mt2 3
я хочу получить вывод, как показано ниже
n1 d1 un1 0.75
n1 d2 un1 1.5
я делаю группирование по 1-му, 2-му и 3-му столбцам и для 4-го столбца следуйте приведенной ниже формуле,4-й столбец = внутри группы, (mt1+mt2)/mt4
Я пытаюсь сделать то же самое с spark DF, предполагая, что данные находятся в фрейме данных с именем столбца как n, d, un, mt, r, я пытаюсь это сделать.
sqlContext.udf.register("aggUDF",(v:List(mt,r))=> ?)
val b = a.groupBy("n","d","un").agg(callUdf("aggUDF",List((mt,r)) should go here))
2 ответа
Если я правильно понимаю, вы сначала хотите вычислить сумму строк с mt1 и mt2 и разделить на сумму строк в mt4 для каждого отдельного n1,d1, un1.
Хотя можно работать с пользовательскими функциями агрегирования, как указано выше, вы также можете использовать небольшую грубую силу (я покажу это в pyspark, но вы сможете легко конвертировать в scala).
Предположим, ваш исходный фрейм данных называется df, а столбцы расположены в следующем порядке: n,d,un,mt,r.
Сначала создайте новый столбец для каждого из mt1, mt2 и mt4 следующим образом:
from pyspark.sql import functions as F
newdf = df.withColumn("mt1", when(df.mt == "mt1", df.r).otherwise(0).alias("mt1"))
newdf = newdf .withColumn("mt2", when(newdf.mt == "mt2", newdf .r).otherwise(0).alias("mt2"))
newdf = newdf .withColumn("mt4", when(newdf.mt == "mt4", newdf .r).otherwise(0).alias("mt4"))
Теперь выполните группирование по первым 3 значениям, а в качестве агрегации - сумму новых 3 значений.
aggregated = newdf.groupBy(["n","d","n"]).agg(F.sum(newdf.mt1).alias("sum_mt1"),F.sum(newdf.mt2).alias("sum_mt2"), F.sum(newdf.mt4).alias("sum_mt4"))
Теперь просто сделайте расчет:
final = aggregated.withColumn("res", (aggregated.sum_mt1 + aggregated.sum_mt2) / aggregated.sum_mt4)
Не самое элегантное решение, но оно может работать на вас...
В настоящее время (Spark 1.4) нет поддержки пользовательских функций агрегирования. Однако вы можете использовать Hive UDAF. Вы можете увидеть пример пользовательской функции агрегирования Hive (UDAF) в Spark здесь.