spark - вычисление среднего значения в 2 или более столбцах и добавление нового столбца в каждой строке

Предположим, у меня есть набор данных /Dataframe со следующим содержимым:

name, marks1, marks2
Alice, 10, 20
Bob, 20, 30

Я хочу добавить новый столбец, который должен иметь среднее значение столбцов B и C.

Ожидаемый результат:-

name, marks1, marks2, Result(Avg)
Alice, 10, 20, 15
Bob, 20, 30, 25

для суммирования или любой другой арифметической операции, которую я использую df.withColumn("xyz", $"marks1"+$"marks2"), Я не могу найти аналогичный способ для среднего. Пожалуйста помоги.

Дополнительно: - Количество столбцов не фиксировано. Как иногда это может быть в среднем 2 столбца, иногда 3 или даже больше. Поэтому я хочу общий код, который должен работать.

2 ответа

Решение

Один из самых простых и оптимизированных способов - создать список столбцов столбцов меток и использовать его с withColumn как

pyspark

from pyspark.sql.functions import col

marksColumns = [col('marks1'), col('marks2')]

averageFunc = sum(x for x in marksColumns)/len(marksColumns)

df.withColumn('Result(Avg)', averageFunc).show(truncate=False)

и вы должны получить

+-----+------+------+-----------+
|name |marks1|marks2|Result(Avg)|
+-----+------+------+-----------+
|Alice|10    |20    |15.0       |
|Bob  |20    |30    |25.0       |
+-----+------+------+-----------+

искровой Scala

в scala процесс почти такой же, как в python выше

import org.apache.spark.sql.functions.{col, lit}

val marksColumns = Array(col("marks1"), col("marks2"))

val averageFunc = marksColumns.foldLeft(lit(0)){(x, y) => x+y}/marksColumns.length

df.withColumn("Result(Avg)", averageFunc).show(false)

который должен дать вам тот же вывод, что и в pyspark

Я надеюсь, что ответ полезен

Это так же просто, как использование пользовательских функций. Создав определенный UDF для работы со средним числом столбцов, вы сможете использовать его столько раз, сколько захотите.

питон

В этом фрагменте я создаю UDF, который принимает массив столбцов и вычисляет его среднее значение.

from pyspark.sql.functions import udf, array
from pyspark.sql.types import DoubleType

avg_cols = udf(lambda array: sum(array)/len(array), DoubleType())

df.withColumn("average", avg_cols(array("marks1", "marks2"))).show()

Выход:

+-----+------+------+--------+
| name|marks1|marks2| average|
+-----+------+------+--------+
|Alice|    10|    20|    15.0|
|  Bob|    20|    30|    25.0|
+-----+------+------+--------+

Scala

При помощи Scala API вы должны обрабатывать выбранные столбцы как строку. Вам просто нужно выбрать столбцы с помощью Spark struct функция.

import org.apache.spark.sql.functions._
import spark.implicits._
import scala.util.Try

def average = udf((row: Row) => {
  val values = row.toSeq.map(x => Try(x.toString.toDouble).toOption).filter(_.isDefined).map(_.get)
  if(values.nonEmpty) values.sum / values.length else 0.0
})

df.withColumn("average", average(struct($"marks1", $"marks2"))).show()

Как вы можете видеть, я преобразую все значения в Double с Try, так что если значение не может быть приведено, оно не будет генерировать никаких исключений, выполняя усреднение только для тех столбцов, которые определены.

И это все:)

Другие вопросы по тегам