Как нарезать и суммировать элементы столбца массива?

Я бы хотел sum (или выполнять другие агрегатные функции тоже) в столбце массива с использованием SparkSQL.

У меня есть столик

+-------+-------+---------------------------------+
|dept_id|dept_nm|                      emp_details|
+-------+-------+---------------------------------+
|     10|Finance|        [100, 200, 300, 400, 500]|
|     20|     IT|                [10, 20, 50, 100]|
+-------+-------+---------------------------------+

Я хотел бы суммировать значения этого emp_details колонна

Ожидаемый запрос:

sqlContext.sql("select sum(emp_details) from mytable").show

Ожидаемый результат

1500
180

Также я должен быть в состоянии суммировать элементы диапазона, например:

sqlContext.sql("select sum(slice(emp_details,0,3)) from mytable").show

результат

600
80

при выполнении суммы для типа Array, как и ожидалось, это показывает, что сумма ожидает, что аргумент будет числовым типом, а не типом массива.

Я думаю, что для этого нам нужно создать UDF. но как?

Буду ли я сталкиваться с какими-либо хитами производительности с UDF? и есть ли другое решение, кроме UDF?

5 ответов

Spark 2.4.0

Начиная с Spark 2.4, Spark SQL поддерживает функции высшего порядка, предназначенные для управления сложными структурами данных, включая массивы.

"Современное" решение будет следующим:

scala> input.show(false)
+-------+-------+-------------------------+
|dept_id|dept_nm|emp_details              |
+-------+-------+-------------------------+
|10     |Finance|[100, 200, 300, 400, 500]|
|20     |IT     |[10, 20, 50, 100]        |
+-------+-------+-------------------------+

input.createOrReplaceTempView("mytable")

val sqlText = "select dept_id, dept_nm, aggregate(emp_details, 0, (value, acc) -> value + acc) as sum from mytable"
scala> sql(sqlText).show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+

Вы можете найти хорошее чтение по функциям высшего порядка в следующих статьях и видео:

  1. Введение новых встроенных функций и функций высшего порядка для сложных типов данных в Apache Spark 2.4
  2. Работа с вложенными данными с использованием функций высшего порядка в SQL для блоков данных
  3. Введение в функции высшего порядка в Spark SQL с Херманом ван Ховеллом (Databricks)

Spark 2.3.2 и более ранние

ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ Я бы не рекомендовал этот подход (хотя он получил наибольшее количество голосов) из-за десериализации, которую выполняет Spark SQL для выполнения Dataset.map, Запрос заставляет Spark десериализовать данные и загрузить их в JVM (из областей памяти, которые управляются Spark вне JVM). Это неизбежно приведет к более частым сборкам мусора и, следовательно, ухудшит производительность.

Одним из решений будет использование Dataset решение, в котором комбинация Spark SQL и Scala может показать свою мощь.

scala> val inventory = Seq(
     |   (10, "Finance", Seq(100, 200, 300, 400, 500)),
     |   (20, "IT", Seq(10, 20, 50, 100))).toDF("dept_id", "dept_nm", "emp_details")
inventory: org.apache.spark.sql.DataFrame = [dept_id: int, dept_nm: string ... 1 more field]

// I'm too lazy today for a case class
scala> inventory.as[(Long, String, Seq[Int])].
  map { case (deptId, deptName, details) => (deptId, deptName, details.sum) }.
  toDF("dept_id", "dept_nm", "sum").
  show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
|     10|Finance|1500|
|     20|     IT| 180|
+-------+-------+----+

Я оставляю часть среза как упражнение, поскольку оно одинаково просто.

Начиная с Spark 2.4 вы можете нарезать с slice функция:

import org.apache.spark.sql.functions.slice

val df = Seq(
  (10, "Finance", Seq(100, 200, 300, 400, 500)),
  (20, "IT", Seq(10, 20, 50, 100))
).toDF("dept_id", "dept_nm", "emp_details")

val dfSliced = df.withColumn(
   "emp_details_sliced",
   slice($"emp_details", 1, 3)
)

dfSliced.show(false)
+-------+-------+-------------------------+------------------+
|dept_id|dept_nm|emp_details              |emp_details_sliced|
+-------+-------+-------------------------+------------------+
|10     |Finance|[100, 200, 300, 400, 500]|[100, 200, 300]   |
|20     |IT     |[10, 20, 50, 100]        |[10, 20, 50]      |
+-------+-------+-------------------------+------------------+

и суммировать массивы с aggregate:

dfSliced.selectExpr(
  "*", 
  "aggregate(emp_details, 0, (x, y) -> x + y) as details_sum",  
  "aggregate(emp_details_sliced, 0, (x, y) -> x + y) as details_sliced_sum"
).show
+-------+-------+--------------------+------------------+-----------+------------------+
|dept_id|dept_nm|         emp_details|emp_details_sliced|details_sum|details_sliced_sum|
+-------+-------+--------------------+------------------+-----------+------------------+
|     10|Finance|[100, 200, 300, 4...|   [100, 200, 300]|       1500|               600|
|     20|     IT|   [10, 20, 50, 100]|      [10, 20, 50]|        180|                80|
+-------+-------+--------------------+------------------+-----------+------------------+

Возможный подход это использовать explode() на ваше Array столбец и, следовательно, агрегировать вывод по уникальному ключу. Например:

import sqlContext.implicits._
import org.apache.spark.sql.functions._

(mytable
  .withColumn("emp_sum",
    explode($"emp_details"))
  .groupBy("dept_nm")
  .agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance|        1500|
|     IT|         180|
+-------+------------+

Чтобы выбрать только конкретные значения в вашем массиве, мы можем работать с ответом на связанный вопрос и применять его с небольшими изменениями:

val slice = udf((array : Seq[Int], from : Int, to : Int) => array.slice(from,to))

(mytable
  .withColumn("slice", 
    slice($"emp_details", 
      lit(0), 
      lit(3)))
  .withColumn("emp_sum",
    explode($"slice"))
  .groupBy("dept_nm")
  .agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance|         600|
|     IT|          80|
+-------+------------+

Данные:

val data = Seq((10, "Finance", Array(100,200,300,400,500)),
               (20, "IT", Array(10,20,50,100)))
val mytable = sc.parallelize(data).toDF("dept_id", "dept_nm","emp_details")

Вот альтернатива ответу Mtoto без использования groupBy (Я действительно не знаю, какой из них самый быстрый: UDF, mtoto-решение или мой, комментарии приветствуются)

Вы бы повлияли на производительность при использовании UDF, в общем. Есть ответ, который вы, возможно, захотите прочитать, и этот ресурс хорошо читается в UDF.

Теперь для вашей проблемы, вы можете избежать использования UDF. Я бы использовал Column выражение, генерируемое с помощью логики Scala.

данные:

val df = Seq((10, "Finance", Array(100,200,300,400,500)),
                  (20, "IT", Array(10,  20, 50,100)))
          .toDF("dept_id", "dept_nm","emp_details")

Вам нужна хитрость, чтобы пройти ArrayType, вы можете немного поиграть с решением, чтобы обнаружить различные проблемы (см. правку внизу для slice часть). Вот мое предложение, но вы можете найти лучше. Сначала вы берете максимальную длину

val maxLength = df.select(size('emp_details).as("l")).groupBy().max("l").first.getInt(0)

Затем вы используете его, тестируя, когда у вас есть более короткий массив

val sumArray = (1 until maxLength)
      .map(i => when(size('emp_details) > i,'emp_details(i)).otherwise(lit(0)))
      .reduce(_ + _)
      .as("sumArray")

val res = df
  .select('dept_id,'dept_nm,'emp_details,sumArray)

результат:

+-------+-------+--------------------+--------+
|dept_id|dept_nm|         emp_details|sumArray|
+-------+-------+--------------------+--------+
|     10|Finance|[100, 200, 300, 4...|    1500|
|     20|     IT|   [10, 20, 50, 100]|     180|
+-------+-------+--------------------+--------+

Я советую вам посмотреть на sumArray чтобы понять, что он делает.

Изменить: Конечно, я снова прочитал только половину вопроса... Но если вы хотите изменить элементы для суммирования, вы можете увидеть, что это становится очевидным с этим решением (то есть вам не нужна функция среза), просто поменяй (0 until maxLength) с диапазоном индекса вам нужно:

def sumArray(from: Int, max: Int) = (from until max)
      .map(i => when(size('emp_details) > i,'emp_details(i)).otherwise(lit(0)))
      .reduce(_ + _)
      .as("sumArray")

Основываясь на потрясающем ответе zero323; если у вас есть массив целых чисел типа Long, то есть BIGINT, вам нужно изменить начальное значение с 0 на BIGINT(0), как описано в первом абзаце здесь, чтобы у вас было

dfSliced.selectExpr(
  "*", 
  "aggregate(emp_details, BIGINT(0), (x, y) -> x + y) as details_sum",  
  "aggregate(emp_details_sliced, BIGINT(0), (x, y) -> x + y) as details_sliced_sum"
).show

Редкий путь отсутствует, поэтому позвольте мне добавить его.

val df = Seq((10, "Finance", Array(100,200,300,400,500)),(20, "IT", Array(10,20,50,100))).toDF("dept_id", "dept_nm","emp_details")

import scala.collection.mutable._

val rdd1 = df.rdd.map( x=> {val p = x.getAs[mutable.WrappedArray[Int]]("emp_details").toArray; Row.merge(x,Row(p.sum,p.slice(0,2).sum)) })

spark.createDataFrame(rdd1,df.schema.add(StructField("sumArray",IntegerType)).add(StructField("sliceArray",IntegerType))).show(false)

Выход:

+-------+-------+-------------------------+--------+----------+
|dept_id|dept_nm|emp_details              |sumArray|sliceArray|
+-------+-------+-------------------------+--------+----------+
|10     |Finance|[100, 200, 300, 400, 500]|1500    |300       |
|20     |IT     |[10, 20, 50, 100]        |180     |30        |
+-------+-------+-------------------------+--------+----------+
Другие вопросы по тегам