Может ли каждый Spark UDAF использоваться с Window?

Я всегда думал, что Spark не позволяет определять User-Defined-Window-Functions. Я только что протестировал пример UDAF "Среднее геометрическое" отсюда ( https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html) как оконную функцию, и, похоже, он работает нормально, например:

val geomMean = new GeometricMean

(1 to 10).map(i=>
  (i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()

+---+----+------------------+
|  i|   x|         geom_mean|
+---+----+------------------+
|  1| 1.0|1.4142135623730951|
|  2| 2.0|1.8171205928321397|
|  3| 3.0|2.8844991406148166|
|  4| 4.0|3.9148676411688634|
|  5| 5.0|  4.93242414866094|
|  6| 6.0| 5.943921952763129|
|  7| 7.0| 6.952053289772898|
|  8| 8.0| 7.958114415792783|
|  9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+

Я никогда не видел, чтобы в документах спарк об использовании UDAF в качестве оконной функции. Это разрешено, то есть результаты правильны? Я использую спарк 2.1 кстати

РЕДАКТИРОВАТЬ:

Что меня смущает, так это то, что при стандартной агрегации (т.е. groupBy), данные всегда добавляются в буферы, то есть они всегда будут расти, а не сокращаться. С оконной функцией (особенно в сочетании с rowsBetween()), данные также должны быть удалены из буфера, так как "старый" элемент будет выпадать из окна при перемещении по строкам, определенным в порядке. Я думаю о оконных функциях для перемещения по порядку с состоянием. Поэтому я предположил, что должен быть реализован метод удаления

1 ответ

Решение

Я не уверен, что именно ваш вопрос.

Может ли каждый Spark UDAF использоваться с Window?

да

Вот мой личный опыт в этой теме:

Я много работал в последнее время со Spark window functions а также UDAFs (Spark 2.0.1), и я подтверждаю, что они очень хорошо работают вместе. Результаты верны (при условии, что ваш UDAF написан правильно). Писать UDAF немного сложно, но как только вы их получите, они быстро переходят к следующим.

Я не тестировал все из них, но встроенные функции агрегирования из org.apache.spark.sql.functions._ работал и для меня. Поиск агрегатов по функциям. Я работал в основном с некоторыми классическими агрегаторами, такими как sum, count, avg, stddev и все они вернули правильные значения.

Другие вопросы по тегам