Пользовательская функция столбца Spark Build, определяемая пользователем функция
Я использую Scala и хочу создать свою собственную функцию DataFrame. Например, я хочу обработать столбец как массив, выполнить итерацию по каждому элементу и произвести расчет.
Для начала я пытаюсь реализовать свой собственный метод getMax. Таким образом, столбец x будет иметь значения [3,8,2,5,9], и ожидаемый результат метода будет 9.
Вот как это выглядит в Scala
def getMax(inputArray: Array[Int]): Int = {
var maxValue = inputArray(0)
for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
maxValue = inputArray(i)
}
maxValue
}
Это то, что я до сих пор и получаю эту ошибку
"value length is not a member of org.apache.spark.sql.column",
и я не знаю, как еще перебрать столбец.
def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
maxValue = col(i)
}
maxValue
}
Как только я смогу реализовать свой собственный метод, я создам функцию столбца
val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”)
И тогда я надеюсь, что смогу использовать это в инструкции SQL, например
val sample = sqlContext.sql("SELECT value_max(x) FROM table")
и ожидаемый результат будет 9, учитывая входной столбец [3,8,2,5,9]
Я следую за ответом из другого потока Spark Scala - Как перебирать строки в фрейме данных и добавлять вычисляемые значения в виде новых столбцов фрейма данных, где они создают частный метод для стандартного отклонения. Расчеты, которые я сделаю, будут более сложными, чем это (например, я буду сравнивать каждый элемент в столбце), иду ли я в правильных направлениях или я должен больше смотреть на пользовательские функции?
2 ответа
В Spark DataFrame вы не можете перебирать элементы столбца, используя подходы, о которых вы думали, потому что столбец не является итеративным объектом.
Однако для обработки значений столбца у вас есть несколько вариантов, и правильный вариант зависит от вашей задачи:
1) Использование существующих встроенных функций
Spark SQL уже имеет множество полезных функций для обработки столбцов, включая функции агрегирования и преобразования. Большинство из них вы можете найти в functions
пакет ( документация здесь). Некоторые другие (бинарные функции в целом) вы можете найти прямо в Column
объект ( документация здесь). Так что, если вы можете использовать их, обычно это лучший вариант. Примечание: не забывайте о функциях окна.
2) Создание UDF
Если вы не можете выполнить свою задачу с помощью встроенных функций, вы можете рассмотреть возможность определения пользовательской функции (UDF). Они полезны, когда вы можете обрабатывать каждый элемент столбца независимо, и вы ожидаете создать новый столбец с тем же числом строк, что и исходный (не агрегированный столбец). Этот подход довольно прост: сначала вы определяете простую функцию, затем регистрируете ее как UDF, а затем используете ее. Пример:
def myFunc: (String => String) = { s => s.toLowerCase }
import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)
val newDF = df.withColumn("newCol", myUDF(df("oldCol")))
Для получения дополнительной информации, вот хорошая статья.
3) Использование UDAF
Если ваша задача заключается в создании агрегированных данных, вы можете определить UDAF (пользовательскую функцию агрегирования). У меня нет большого опыта в этом, но я могу указать вам хороший учебник:
https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/
4) Возврат к обработке RDD
Если вы действительно не можете использовать описанные выше параметры или если ваша задача обработки зависит от разных строк для обработки одной и это не агрегация, то я думаю, что вам нужно будет выбрать нужный столбец и обработать его, используя соответствующий RDD. Пример:
val singleColumnDF = df("column")
val myRDD = singleColumnDF.rdd
// process myRDD
Итак, были варианты, которые я мог придумать. Я надеюсь, что это помогает.
Простой пример приведен в отличной документации, где целый раздел посвящен UDF:
import org.apache.spark.sql._
val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
val spark = df.sparkSession
spark.udf.register("simpleUDF", (v: Int) => v * v)
df.select($"id", callUDF("simpleUDF", $"value"))