Как использовать класс Scala внутри Pyspark

Я долго искал, есть ли способ использовать Scala класс в Pysparkи я не нашел ни документации, ни руководства по этому вопросу.

Допустим, я создаю простой класс в Scala который использует некоторые библиотеки apache-spark, что-то вроде:

class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._

    df.select(col(column))
  }
}
  • Есть ли какой-нибудь возможный способ использовать этот класс в Pyspark?
  • Это слишком сложно?
  • Должен ли я создать .py файл?
  • Есть ли руководство, которое показывает, как это сделать?

Кстати, я также посмотрел на spark код и я чувствовал себя немного потерянным, и я был неспособен воспроизвести их функциональность для моих собственных целей.

2 ответа

Решение

Да, это возможно, хотя может быть далеко не тривиальным. Как правило, вам нужна Java (дружественная) оболочка, чтобы вам не приходилось иметь дело с функциями Scala, которые не могут быть легко выражены с помощью простой Java и, как результат, не очень хорошо работают со шлюзом Py4J.

Предполагая, что ваш класс в пакете com.example и есть Python DataFrame называется df

df = ... # Python DataFrame

вам придется:

  1. Постройте банку, используя ваш любимый инструмент для сборки.

  2. Включите его в classpath драйвера, например, используя --driver-class-path аргумент для оболочки PySpark / spark-submit, В зависимости от точного кода вам, возможно, придется передать его, используя --jars также

  3. Извлечь экземпляр JVM из Python SparkContext пример:

    jvm = sc._jvm
    
  4. Извлечь Скала SQLContext из SQLContext пример:

    ssqlContext = sqlContext._ssql_ctx
    
  5. Извлечь Java DataFrame от df:

    jdf = df._jdf
    
  6. Создать новый экземпляр SimpleClass:

    simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
    
  7. Вызовexe метод и обернуть результат с помощью Python DataFrame:

    from pyspark.sql import DataFrame
    
    DataFrame(simpleObject.exe(), ssqlContext)
    

Результат должен быть действительным PySpark DataFrame, Конечно, вы можете объединить все шаги в один звонок.

Важный: этот подход возможен, только если код Python выполняется исключительно на драйвере. Его нельзя использовать внутри действия или преобразования Python. См. Как использовать функцию Java/Scala из действия или преобразования? для деталей.

В качестве обновления user1560062, учитывая, что API-интерфейсы Spark развивались за последние шесть лет, рецепт, который работает в Spark-3.2, выглядит следующим образом:

  1. Скомпилируйте код Scala в файл JAR (например, используя sbt assembly)
  2. Включите файл JAR в --jarsаргумент spark-submitвместе с любым --py-filesаргументы, необходимые для локальных определений пакетов
  3. Извлеките экземпляр JVM в Python:
      jvm = spark._jvm
  1. Извлеките представление Java SparkSession:
      jSess = spark._jsparkSession
  1. Извлеките Java-представление PySpark:
      jdf = df._jdf
  1. Создайте новый экземпляр SimpleClass:
      simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
  1. Позвоните exeметод и преобразовать его вывод в PySpark DataFrame:
      from pyspark.sql import DataFrame

result = DataFrame(simpleObject.exe(), spark)
Другие вопросы по тегам