Агрегирование нескольких столбцов с пользовательской функцией в искре
Мне было интересно, если есть какой-то способ указать пользовательскую функцию агрегирования для фреймов данных искры по нескольким столбцам.
У меня есть такая таблица типа (имя, предмет, цена):
john | tomato | 1.99
john | carrot | 0.45
bill | apple | 0.99
john | banana | 1.29
bill | taco | 2.59
чтобы:
Я хотел бы объединить элемент и его стоимость для каждого человека в такой список:
john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29)
bill | (apple, 0.99), (taco, 2.59)
Это возможно в кадрах данных? Я недавно узнал о collect_list
но, похоже, работает только для одного столбца.
4 ответа
Самый простой способ сделать это как DataFrame
это сначала собрать два списка, а затем использовать UDF
в zip
два списка вместе. Что-то вроде:
import org.apache.spark.sql.functions.{collect_list, udf}
import sqlContext.implicits._
val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
val df2 = df.groupBy("name").agg(
collect_list(col("food")) as "food",
collect_list(col("price")) as "price"
).withColumn("food", zipper(col("food"), col("price"))).drop("price")
df2.show(false)
# +----+---------------------------------------------+
# |name|food |
# +----+---------------------------------------------+
# |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
# |bill|[[apple,0.99], [taco,2.59]] |
# +----+---------------------------------------------+
Рассмотрите возможность использования struct
Функция для группировки столбцов перед сборкой в виде списка:
import org.apache.spark.sql.functions.{collect_list, struct}
import sqlContext.implicits._
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
df.groupBy($"name")
.agg(collect_list(struct($"food", $"price")).as("foods"))
.show(false)
Выходы:
+----+---------------------------------------------+
|name|foods |
+----+---------------------------------------------+
|john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]|
|bill|[[apple,0.99], [taco,2.59]] |
+----+---------------------------------------------+
Может быть, лучший способ, чем zip
Функция (поскольку UDF и UDAF очень плохо влияют на производительность) состоит в том, чтобы обернуть два столбца в Struct
,
Это, вероятно, будет работать так же:
df.select('name, struct('food, 'price).as("tuple"))
.groupBy('name)
.agg(collect_list('tuple).as("tuples"))
На ваш взгляд, collect_list, похоже, работает только для одного столбца: для того, чтобы collect_list работал с несколькими столбцами, вам нужно будет обернуть столбцы, которые вы хотите как агрегат, в структуру. Например:
val aggregatedData = df.groupBy("name").agg(collect_list(struct("item", "price")) as("food"))
aggregatedData.show
+----+------------------------------------------------+
|name|foods |
+----+------------------------------------------------+
|john|[[tomato, 1.99], [carrot, 0.45], [banana, 1.29]]|
|bill|[[apple, 0.99], [taco, 2.59]] |
+----+------------------------------------------------+
Вот вариант, преобразуя фрейм данных в СДР карты, а затем вызвать groupByKey
в теме. Результатом будет список пар ключ-значение, где значение - это список кортежей.
df.show
+----+------+----+
| _1| _2| _3|
+----+------+----+
|john|tomato|1.99|
|john|carrot|0.45|
|bill| apple|0.99|
|john|banana|1.29|
|bill| taco|2.59|
+----+------+----+
val tuples = df.map(row => row(0) -> (row(1), row(2)))
tuples: org.apache.spark.rdd.RDD[(Any, (Any, Any))] = MapPartitionsRDD[102] at map at <console>:43
tuples.groupByKey().map{ case(x, y) => (x, y.toList) }.collect
res76: Array[(Any, List[(Any, Any)])] = Array((bill,List((apple,0.99), (taco,2.59))), (john,List((tomato,1.99), (carrot,0.45), (banana,1.29))))