Зарегистрируйте UDF в SqlContext из Scala для использования в PySpark

Можно ли зарегистрировать UDF (или функцию), написанную на Scala, для использования в PySpark? Например:

val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2

В Scala теперь возможно следующее:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3

Я хотел бы использовать "UDFaddOne" в PySpark, как

%pyspark

mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work

Справочная информация: Мы являемся командой разработчиков, некоторые программируют на Scala, а некоторые на Python и хотели бы поделиться уже написанными функциями. Также было бы возможно сохранить это в библиотеке и импортировать это.

2 ответа

Решение

Насколько я знаю, PySpark не предоставляет эквивалента callUDF функция и из-за этого невозможно напрямую получить доступ к зарегистрированной UDF.

Самым простым решением здесь является использование необработанного выражения SQL:

mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))

## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")

## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")

Этот подход довольно ограничен, поэтому, если вам нужно поддерживать более сложные рабочие процессы, вы должны собрать пакет и предоставить готовые оболочки Python. Вы найдете пример оболочки UDAF в моем ответе Spark: Как сопоставить Python с пользовательскими функциями Scala или Java?

Следующее работало для меня (в основном сводка нескольких мест, включая ссылку, предоставленную zero323):

В скале:

package com.example
import org.apache.spark.sql.functions.udf

object udfObj extends Serializable {
  def createUDF = {
    udf((x: Int) => x + 1)
  }
}

в python (предположим, sc - это контекст spark. Если вы используете spark 2.0, вы можете получить его из сеанса spark):

from py4j.java_gateway import java_import
from pyspark.sql.column import Column

jvm = sc._gateway.jvm
java_import(jvm, "com.example")
def udf_f(col):
    return Column(jvm.com.example.udfObj.createUDF().apply(col))

И, конечно же, убедитесь, что jar, созданный в scala, добавлен с помощью --jars и --driver-class-path.

Итак, что здесь происходит:

Мы создаем функцию внутри сериализуемого объекта, которая возвращает udf в scala (я не уверен на 100%, что Serializable требуется, он был необходим для меня для более сложных UDF, так что это может быть потому, что ему нужно было передавать объекты Java).

В python мы используем доступ к внутреннему jvm (это закрытый член, так что он может быть изменен в будущем, но я не вижу возможности обойти это) и импортируем наш пакет, используя java_import. Мы получаем доступ к функции createUDF и вызываем ее. Это создает объект, у которого есть метод apply (функции в scala на самом деле являются java-объектами с методом apply). Вход для метода apply является столбцом. Результатом применения столбца является новый столбец, поэтому нам нужно обернуть его методом Column, чтобы сделать его доступным для withColumn.

Другие вопросы по тегам