Пользовательская функция столбца Spark Build, определяемая пользователем функция

Я использую Scala и хочу создать свою собственную функцию DataFrame. Например, я хочу обработать столбец как массив, выполнить итерацию по каждому элементу и произвести расчет.

Для начала я пытаюсь реализовать свой собственный метод getMax. Таким образом, столбец x будет иметь значения [3,8,2,5,9], и ожидаемый результат метода будет 9.

Вот как это выглядит в Scala

def getMax(inputArray: Array[Int]): Int = {
   var maxValue = inputArray(0)
   for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
     maxValue = inputArray(i)
   }
   maxValue
}

Это то, что я до сих пор и получаю эту ошибку

"value length is not a member of org.apache.spark.sql.column", 

и я не знаю, как еще перебрать столбец.

def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
    maxValue = col(i)
}
maxValue

}

Как только я смогу реализовать свой собственный метод, я создам функцию столбца

val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”)

И тогда я надеюсь, что смогу использовать это в инструкции SQL, например

val sample = sqlContext.sql("SELECT value_max(x) FROM table")

и ожидаемый результат будет 9, учитывая входной столбец [3,8,2,5,9]

Я следую за ответом из другого потока Spark Scala - Как перебирать строки в фрейме данных и добавлять вычисляемые значения в виде новых столбцов фрейма данных, где они создают частный метод для стандартного отклонения. Расчеты, которые я сделаю, будут более сложными, чем это (например, я буду сравнивать каждый элемент в столбце), иду ли я в правильных направлениях или я должен больше смотреть на пользовательские функции?

2 ответа

Решение

В Spark DataFrame вы не можете перебирать элементы столбца, используя подходы, о которых вы думали, потому что столбец не является итеративным объектом.

Однако для обработки значений столбца у вас есть несколько вариантов, и правильный вариант зависит от вашей задачи:

1) Использование существующих встроенных функций

Spark SQL уже имеет множество полезных функций для обработки столбцов, включая функции агрегирования и преобразования. Большинство из них вы можете найти в functions пакет ( документация здесь). Некоторые другие (бинарные функции в целом) вы можете найти прямо в Column объект ( документация здесь). Так что, если вы можете использовать их, обычно это лучший вариант. Примечание: не забывайте о функциях окна.

2) Создание UDF

Если вы не можете выполнить свою задачу с помощью встроенных функций, вы можете рассмотреть возможность определения пользовательской функции (UDF). Они полезны, когда вы можете обрабатывать каждый элемент столбца независимо, и вы ожидаете создать новый столбец с тем же числом строк, что и исходный (не агрегированный столбец). Этот подход довольно прост: сначала вы определяете простую функцию, затем регистрируете ее как UDF, а затем используете ее. Пример:

def myFunc: (String => String) = { s => s.toLowerCase }

import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)

val newDF = df.withColumn("newCol", myUDF(df("oldCol")))

Для получения дополнительной информации, вот хорошая статья.

3) Использование UDAF

Если ваша задача заключается в создании агрегированных данных, вы можете определить UDAF (пользовательскую функцию агрегирования). У меня нет большого опыта в этом, но я могу указать вам хороший учебник:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4) Возврат к обработке RDD

Если вы действительно не можете использовать описанные выше параметры или если ваша задача обработки зависит от разных строк для обработки одной и это не агрегация, то я думаю, что вам нужно будет выбрать нужный столбец и обработать его, используя соответствующий RDD. Пример:

val singleColumnDF = df("column")

val myRDD = singleColumnDF.rdd

// process myRDD

Итак, были варианты, которые я мог придумать. Я надеюсь, что это помогает.

Простой пример приведен в отличной документации, где целый раздел посвящен UDF:

import org.apache.spark.sql._

val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
val spark = df.sparkSession
spark.udf.register("simpleUDF", (v: Int) => v * v)
df.select($"id", callUDF("simpleUDF", $"value"))
Другие вопросы по тегам