Использование Scala UDF в PySpark
Я хочу иметь возможность использовать функцию Scala как UDF в PySpark
package com.test
object ScalaPySparkUDFs extends Serializable {
def testFunction1(x: Int): Int = { x * 2 }
def testUDFFunction1 = udf { x: Int => testFunction1(x) }
}
Я могу получить доступ testFunction1
в PySpark и он возвращает значения:
functions = sc._jvm.com.test.ScalaPySparkUDFs
functions.testFunction1(10)
Я хочу использовать эту функцию как UDF, в идеале withColumn
вызов:
row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))
Я думаю, что многообещающий подход такой, как здесь: Spark: Как сопоставить Python с пользовательскими функциями Scala или Java?
Тем не менее, при внесении изменений в код, найденный там, чтобы использовать testUDFFunction1
вместо:
def udf_test(col):
sc = SparkContext._active_spark_context
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
return Column(_f(_to_seq(sc, [col], _to_java_column)))
Я получил:
AttributeError: 'JavaMember' object has no attribute 'apply'
Я не понимаю этого, потому что я верю testUDFFunction1
есть метод применения?
Я не хочу использовать выражения типа, найденного здесь: Зарегистрируйте UDF в SqlContext из Scala для использования в PySpark
Будем благодарны за любые предложения о том, как сделать эту работу!
2 ответа
Вопрос, который вы связали, использует Scala object
, Scala object
это одиночка, и вы можете использовать apply
метод напрямую.
Здесь вы используете нулевую функцию, которая возвращает объект UserDefinedFunction
class co Вы должны сначала вызвать функцию:
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))
Согласитесь с @user6910411, вы должны вызывать метод apply непосредственно для функции. Итак, ваш код будет.
UDF в Скала:
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
object ScalaPySparkUDFs {
def testFunction1(x: Int): Int = { x * 2 }
def getFun(): UserDefinedFunction = udf(testFunction1 _ )
}
Код PySpark:
def test_udf(col):
sc = spark.sparkContext
_test_udf = sc._jvm.com.test.ScalaPySparkUDFs.getFun()
return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))
row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", test_udf(numbers['Value']))