Может ли каждый Spark UDAF использоваться с Window?
Я всегда думал, что Spark не позволяет определять User-Defined-Window-Functions. Я только что протестировал пример UDAF "Среднее геометрическое" отсюда ( https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html) как оконную функцию, и, похоже, он работает нормально, например:
val geomMean = new GeometricMean
(1 to 10).map(i=>
(i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()
+---+----+------------------+
| i| x| geom_mean|
+---+----+------------------+
| 1| 1.0|1.4142135623730951|
| 2| 2.0|1.8171205928321397|
| 3| 3.0|2.8844991406148166|
| 4| 4.0|3.9148676411688634|
| 5| 5.0| 4.93242414866094|
| 6| 6.0| 5.943921952763129|
| 7| 7.0| 6.952053289772898|
| 8| 8.0| 7.958114415792783|
| 9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+
Я никогда не видел, чтобы в документах спарк об использовании UDAF в качестве оконной функции. Это разрешено, то есть результаты правильны? Я использую спарк 2.1 кстати
РЕДАКТИРОВАТЬ:
Что меня смущает, так это то, что при стандартной агрегации (т.е. groupBy
), данные всегда добавляются в буферы, то есть они всегда будут расти, а не сокращаться. С оконной функцией (особенно в сочетании с rowsBetween()
), данные также должны быть удалены из буфера, так как "старый" элемент будет выпадать из окна при перемещении по строкам, определенным в порядке. Я думаю о оконных функциях для перемещения по порядку с состоянием. Поэтому я предположил, что должен быть реализован метод удаления
1 ответ
Я не уверен, что именно ваш вопрос.
Может ли каждый Spark UDAF использоваться с Window?
да
Вот мой личный опыт в этой теме:
Я много работал в последнее время со Spark window functions
а также UDAFs
(Spark 2.0.1), и я подтверждаю, что они очень хорошо работают вместе. Результаты верны (при условии, что ваш UDAF написан правильно). Писать UDAF немного сложно, но как только вы их получите, они быстро переходят к следующим.
Я не тестировал все из них, но встроенные функции агрегирования из org.apache.spark.sql.functions._
работал и для меня. Поиск агрегатов по функциям. Я работал в основном с некоторыми классическими агрегаторами, такими как sum
, count
, avg
, stddev
и все они вернули правильные значения.