Использование Scala UDF в PySpark

Я хочу иметь возможность использовать функцию Scala как UDF в PySpark

package com.test

object ScalaPySparkUDFs extends Serializable {
    def testFunction1(x: Int): Int = { x * 2 }
    def testUDFFunction1 = udf { x: Int => testFunction1(x) }
} 

Я могу получить доступ testFunction1 в PySpark и он возвращает значения:

functions = sc._jvm.com.test.ScalaPySparkUDFs 
functions.testFunction1(10)

Я хочу использовать эту функцию как UDF, в идеале withColumn вызов:

row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))

Я думаю, что многообещающий подход такой, как здесь: Spark: Как сопоставить Python с пользовательскими функциями Scala или Java?

Тем не менее, при внесении изменений в код, найденный там, чтобы использовать testUDFFunction1 вместо:

def udf_test(col):
    sc = SparkContext._active_spark_context
    _f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
    return Column(_f(_to_seq(sc, [col], _to_java_column)))

Я получил:

 AttributeError: 'JavaMember' object has no attribute 'apply' 

Я не понимаю этого, потому что я верю testUDFFunction1 есть метод применения?

Я не хочу использовать выражения типа, найденного здесь: Зарегистрируйте UDF в SqlContext из Scala для использования в PySpark

Будем благодарны за любые предложения о том, как сделать эту работу!

2 ответа

Решение

Вопрос, который вы связали, использует Scala object, Scala object это одиночка, и вы можете использовать apply метод напрямую.

Здесь вы используете нулевую функцию, которая возвращает объект UserDefinedFunction class co Вы должны сначала вызвать функцию:

_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))

Согласитесь с @user6910411, вы должны вызывать метод apply непосредственно для функции. Итак, ваш код будет.

UDF в Скала:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._


object ScalaPySparkUDFs {

    def testFunction1(x: Int): Int = { x * 2 }

    def getFun(): UserDefinedFunction = udf(testFunction1 _ )
}

Код PySpark:

def test_udf(col):
    sc = spark.sparkContext
    _test_udf = sc._jvm.com.test.ScalaPySparkUDFs.getFun()
    return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))


row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", test_udf(numbers['Value']))
Другие вопросы по тегам