Как использовать класс Scala внутри Pyspark
Я долго искал, есть ли способ использовать Scala
класс в Pyspark
и я не нашел ни документации, ни руководства по этому вопросу.
Допустим, я создаю простой класс в Scala
который использует некоторые библиотеки apache-spark
, что-то вроде:
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
def exe(): DataFrame = {
import sqlContext.implicits._
df.select(col(column))
}
}
- Есть ли какой-нибудь возможный способ использовать этот класс в
Pyspark
? - Это слишком сложно?
- Должен ли я создать
.py
файл? - Есть ли руководство, которое показывает, как это сделать?
Кстати, я также посмотрел на spark
код и я чувствовал себя немного потерянным, и я был неспособен воспроизвести их функциональность для моих собственных целей.
2 ответа
Да, это возможно, хотя может быть далеко не тривиальным. Как правило, вам нужна Java (дружественная) оболочка, чтобы вам не приходилось иметь дело с функциями Scala, которые не могут быть легко выражены с помощью простой Java и, как результат, не очень хорошо работают со шлюзом Py4J.
Предполагая, что ваш класс в пакете com.example
и есть Python DataFrame
называется df
df = ... # Python DataFrame
вам придется:
Постройте банку, используя ваш любимый инструмент для сборки.
Включите его в classpath драйвера, например, используя
--driver-class-path
аргумент для оболочки PySpark /spark-submit
, В зависимости от точного кода вам, возможно, придется передать его, используя--jars
такжеИзвлечь экземпляр JVM из Python
SparkContext
пример:jvm = sc._jvm
Извлечь Скала
SQLContext
изSQLContext
пример:ssqlContext = sqlContext._ssql_ctx
Извлечь Java
DataFrame
отdf
:jdf = df._jdf
Создать новый экземпляр
SimpleClass
:simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
Вызов
exe
метод и обернуть результат с помощью PythonDataFrame
:from pyspark.sql import DataFrame DataFrame(simpleObject.exe(), ssqlContext)
Результат должен быть действительным PySpark DataFrame
, Конечно, вы можете объединить все шаги в один звонок.
Важный: этот подход возможен, только если код Python выполняется исключительно на драйвере. Его нельзя использовать внутри действия или преобразования Python. См. Как использовать функцию Java/Scala из действия или преобразования? для деталей.
В качестве обновления user1560062, учитывая, что API-интерфейсы Spark развивались за последние шесть лет, рецепт, который работает в Spark-3.2, выглядит следующим образом:
- Скомпилируйте код Scala в файл JAR (например, используя
sbt assembly
) - Включите файл JAR в
--jars
аргументspark-submit
вместе с любым--py-files
аргументы, необходимые для локальных определений пакетов - Извлеките экземпляр JVM в Python:
jvm = spark._jvm
- Извлеките представление Java
SparkSession
:
jSess = spark._jsparkSession
- Извлеките Java-представление PySpark:
jdf = df._jdf
- Создайте новый экземпляр
SimpleClass
:
simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
- Позвоните
exe
метод и преобразовать его вывод в PySparkDataFrame
:
from pyspark.sql import DataFrame
result = DataFrame(simpleObject.exe(), spark)