spark - вычисление среднего значения в 2 или более столбцах и добавление нового столбца в каждой строке
Предположим, у меня есть набор данных /Dataframe со следующим содержимым:
name, marks1, marks2
Alice, 10, 20
Bob, 20, 30
Я хочу добавить новый столбец, который должен иметь среднее значение столбцов B и C.
Ожидаемый результат:-
name, marks1, marks2, Result(Avg)
Alice, 10, 20, 15
Bob, 20, 30, 25
для суммирования или любой другой арифметической операции, которую я использую df.withColumn("xyz", $"marks1"+$"marks2")
, Я не могу найти аналогичный способ для среднего. Пожалуйста помоги.
Дополнительно: - Количество столбцов не фиксировано. Как иногда это может быть в среднем 2 столбца, иногда 3 или даже больше. Поэтому я хочу общий код, который должен работать.
2 ответа
Один из самых простых и оптимизированных способов - создать список столбцов столбцов меток и использовать его с withColumn
как
pyspark
from pyspark.sql.functions import col
marksColumns = [col('marks1'), col('marks2')]
averageFunc = sum(x for x in marksColumns)/len(marksColumns)
df.withColumn('Result(Avg)', averageFunc).show(truncate=False)
и вы должны получить
+-----+------+------+-----------+
|name |marks1|marks2|Result(Avg)|
+-----+------+------+-----------+
|Alice|10 |20 |15.0 |
|Bob |20 |30 |25.0 |
+-----+------+------+-----------+
искровой Scala
в scala процесс почти такой же, как в python выше
import org.apache.spark.sql.functions.{col, lit}
val marksColumns = Array(col("marks1"), col("marks2"))
val averageFunc = marksColumns.foldLeft(lit(0)){(x, y) => x+y}/marksColumns.length
df.withColumn("Result(Avg)", averageFunc).show(false)
который должен дать вам тот же вывод, что и в pyspark
Я надеюсь, что ответ полезен
Это так же просто, как использование пользовательских функций. Создав определенный UDF для работы со средним числом столбцов, вы сможете использовать его столько раз, сколько захотите.
питон
В этом фрагменте я создаю UDF, который принимает массив столбцов и вычисляет его среднее значение.
from pyspark.sql.functions import udf, array
from pyspark.sql.types import DoubleType
avg_cols = udf(lambda array: sum(array)/len(array), DoubleType())
df.withColumn("average", avg_cols(array("marks1", "marks2"))).show()
Выход:
+-----+------+------+--------+
| name|marks1|marks2| average|
+-----+------+------+--------+
|Alice| 10| 20| 15.0|
| Bob| 20| 30| 25.0|
+-----+------+------+--------+
Scala
При помощи Scala API вы должны обрабатывать выбранные столбцы как строку. Вам просто нужно выбрать столбцы с помощью Spark struct
функция.
import org.apache.spark.sql.functions._
import spark.implicits._
import scala.util.Try
def average = udf((row: Row) => {
val values = row.toSeq.map(x => Try(x.toString.toDouble).toOption).filter(_.isDefined).map(_.get)
if(values.nonEmpty) values.sum / values.length else 0.0
})
df.withColumn("average", average(struct($"marks1", $"marks2"))).show()
Как вы можете видеть, я преобразую все значения в Double с Try
, так что если значение не может быть приведено, оно не будет генерировать никаких исключений, выполняя усреднение только для тех столбцов, которые определены.
И это все:)