Зарегистрируйте UDF в SqlContext из Scala для использования в PySpark
Можно ли зарегистрировать UDF (или функцию), написанную на Scala, для использования в PySpark? Например:
val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2
В Scala теперь возможно следующее:
val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3
Я хотел бы использовать "UDFaddOne" в PySpark, как
%pyspark
mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work
Справочная информация: Мы являемся командой разработчиков, некоторые программируют на Scala, а некоторые на Python и хотели бы поделиться уже написанными функциями. Также было бы возможно сохранить это в библиотеке и импортировать это.
2 ответа
Насколько я знаю, PySpark не предоставляет эквивалента callUDF
функция и из-за этого невозможно напрямую получить доступ к зарегистрированной UDF.
Самым простым решением здесь является использование необработанного выражения SQL:
mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))
## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")
## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")
Этот подход довольно ограничен, поэтому, если вам нужно поддерживать более сложные рабочие процессы, вы должны собрать пакет и предоставить готовые оболочки Python. Вы найдете пример оболочки UDAF в моем ответе Spark: Как сопоставить Python с пользовательскими функциями Scala или Java?
Следующее работало для меня (в основном сводка нескольких мест, включая ссылку, предоставленную zero323):
В скале:
package com.example
import org.apache.spark.sql.functions.udf
object udfObj extends Serializable {
def createUDF = {
udf((x: Int) => x + 1)
}
}
в python (предположим, sc - это контекст spark. Если вы используете spark 2.0, вы можете получить его из сеанса spark):
from py4j.java_gateway import java_import
from pyspark.sql.column import Column
jvm = sc._gateway.jvm
java_import(jvm, "com.example")
def udf_f(col):
return Column(jvm.com.example.udfObj.createUDF().apply(col))
И, конечно же, убедитесь, что jar, созданный в scala, добавлен с помощью --jars и --driver-class-path.
Итак, что здесь происходит:
Мы создаем функцию внутри сериализуемого объекта, которая возвращает udf в scala (я не уверен на 100%, что Serializable требуется, он был необходим для меня для более сложных UDF, так что это может быть потому, что ему нужно было передавать объекты Java).
В python мы используем доступ к внутреннему jvm (это закрытый член, так что он может быть изменен в будущем, но я не вижу возможности обойти это) и импортируем наш пакет, используя java_import. Мы получаем доступ к функции createUDF и вызываем ее. Это создает объект, у которого есть метод apply (функции в scala на самом деле являются java-объектами с методом apply). Вход для метода apply является столбцом. Результатом применения столбца является новый столбец, поэтому нам нужно обернуть его методом Column, чтобы сделать его доступным для withColumn.