Найти медиану в искровом SQL для нескольких столбцов с двойным типом данных

У меня есть требование найти медиану для нескольких столбцов с двойным типом данных. Запросить предложение, чтобы найти правильный подход.

Ниже приведен мой пример набора данных с одним столбцом. Я ожидаю, что значение медианы будет возвращено как 1 для моего образца.

  scala> sqlContext.sql("select num from test").show();
+---+
|num|
+---+
|0.0|
|0.0|
|1.0|
|1.0|
|1.0|
|1.0|
+---+

Я попробовал следующие варианты

1) Улей процентиль UDAF, он работал только для BigInt.

2) Улей UDAT Pertile_approx, но он не работает должным образом (возвращает 0,25 против 1).

sqlContext.sql ("выберите процент из приложения (число 0,5) из теста"). show ();

+----+
| _c0|
+----+
|0.25|
+----+

3) Функция Spark Window процента_rank- чтобы найти медиану, как я вижу, чтобы найти все проценты выше 0,5 и выбрать соответствующее значение максимального числа процентов_ранка. Но это работает не во всех случаях, особенно когда у меня есть даже количество записей, в этом случае медиана - это среднее значение среднего значения в отсортированном распределении.

Кроме того, в процентном выражении, так как мне нужно найти медиану для нескольких столбцов, я должен рассчитать ее в разных фреймах данных, что для меня является немного сложным методом. Пожалуйста, поправьте меня, если мое понимание неверно.

+---+-------------+
|num|percent_rank |
+---+-------------+
|0.0|0.0|
|0.0|0.0|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
+---+---+

1 ответ

Решение

Какую версию Apache Spark вы используете из любопытства? В Apache Spark 2.0+ были исправлены некоторые ошибки approxQuantile,

Если бы я должен был запустить фрагмент кода pySpark ниже:

rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]])
df = rdd.toDF(['id', 'num'])
df.createOrReplaceTempView("df")

с median расчет с использованием approxQuantile как:

df.approxQuantile("num", [0.5], 0.25)

или же

spark.sql("select percentile_approx(num, 0.5) from df").show()

результаты:

  • Spark 2.0.0: 0.25
  • Spark 2.0.1: 1.0
  • Spark 2.1.0: 1.0

Обратите внимание, что это приблизительные цифры (через approxQuantile) хотя в целом это должно хорошо работать. Если вам нужна точная медиана, одним из подходов является использование numpy.median, Ниже приведен фрагмент кода df пример, основанный на ответе gench на вопрос " Как найти медиану в Apache Spark с API Python Dataframe?":

from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np

def find_median(values):
    try:
        median = np.median(values) #get the median of values in a list in each row
        return round(float(median),2)
    except Exception:
        return None #if there is anything wrong with the given values

median_finder = F.udf(find_median,FloatType())

df2 = df.groupBy("id").agg(F.collect_list("num").alias("nums"))
df2 = df2.withColumn("median", median_finder("nums"))

# print out
df2.show()

с выводом:

+---+--------------------+------+
| id|                nums|median|
+---+--------------------+------+
|  1|[0.0, 0.0, 1.0, 1...|   1.0|
+---+--------------------+------+

Обновлено: версия Spark 1.6 Scala с использованием RDD

Если вы используете Spark 1.6, вы можете рассчитать median используя код Scala через ответ Евгения Жуленева. Как вычислить точную медиану с помощью Apache Spark. Ниже приведен модифицированный код, который работает с нашим примером.

import org.apache.spark.SparkContext._

  val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0)))

  val sorted = rdd.sortBy(identity).zipWithIndex().map {
    case (v, idx) => (idx, v)
  }

  val count = sorted.count()

  val median: Double = if (count % 2 == 0) {
    val l = count / 2 - 1
    val r = l + 1
    (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
  } else sorted.lookup(count / 2).head.toDouble

с выводом:

// output
import org.apache.spark.SparkContext._
rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34
sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36
count: Long = 6
median: Double = 1.0

Обратите внимание, что это точное вычисление медианы с использованием RDDs - то есть вам нужно будет преобразовать столбец DataFrame в RDD, чтобы выполнить этот расчет.

Другие вопросы по тегам