Передача данных из scala в python с использованием JEP

Вот что я пытаюсь сделать:

  1. Я прочитал данные в Scala
  2. Извлечь несколько столбцов
  3. Используя JEP, передайте созданный фрейм данных в скрипт Python
  4. Скрипт Python преобразует фрейм данных в pandas, выполняет некоторую операцию и возвращает ее обратно

Однако я не уверен, как передать dataframe скрипту Python. Вот сценарий Python (это просто пример сценария, а не фактический):

import findspark
findspark.init()
import pandas as pd
#from pyspark.sql import types.*
from pyspark.sql import DataFrame as dataframe

def tes(df: dataframe):
    df = df.toPandas()
    df['concatenate'] = df['country'] + df['datasourceProvidedCountry']
    return dataframe(df)

и он продолжает сбой со следующей ошибкой:

jep.JepException: <class 'ImportError'>: py4j.protocol
  at /usr/local/lib64/python3.6/site-packages/jep/java_import_hook.__getattr__(java_import_hook.py:57)
  at /home/hadoop/testpy.<module>(testpy.py:5)
  at jep.Jep.run(Native Method)
  at jep.Jep.runScript(Jep.java:359)
  at jep.Jep.runScript(Jep.java:335)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: py4j.protocol
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 52 more
spark-shell --conf spark.driver.extraLibraryPath=:/usr/local/lib64/python3.6/site-packages/jep:/usr/local/lib/python3.6/site-packages/py4j/ --jars /home/hadoop/jep-3.8.2.jar

Кто-нибудь может посоветовать, как я могу передать фрейм данных из scala в pyspark с помощью Jep (Если это дубликат, укажите мне на нужную ветку, потому что я не смог ее найти)?

0 ответов

У меня такое же требование, и я тоже пробовал с Jep. К сожалению, Jep не подходит для этого варианта использования.

Не найденный py4j.protocol вызван Jep ClassEnquirer, когда и python, и jave имеют библиотеку с тем же именем, Jep будет рассматривать библиотеку java. Вы можете решить эту проблему, исключив py4j из пакета Spark из своего Java-приложения или создав настраиваемый ClassEnquirer для рассмотрения python py4j.

Вам также необходимо обновить конструктор Jep, установить для параметра useSubInterpreter значение false и перестроить его.

public Jep(JepConfig config) throws JepException {
    this(config, false);
}

Теперь ошибка должна быть устранена. Однако объект, передаваемый в функцию python, - это объект PyObject, содержащий ссылку на Java, который не является объектом фрейма данных pyspark, поэтому у него нет функции toPandas().

Альтернативным способом может быть использование gRPC или Apache thrift, вы можете проверить документ для более подробной информации.

Можно передавать данные из Apache Spark правильный (JVM) код Python с использованием Apache Arrow - начиная с 2.3 Spark использует форматы Arrow, которые можно использовать как в JVM, так и в CPython.

См. https://fossies.org/diffs/spark/2.3.3_vs_2.4.0/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala-diff.html для вдохновения..

Я передавал данные между кодом JVM и CPython в одном процессе (без сокетов), используя jep (Встроенный Java-Python) DirectNDArray ("вне кучи", "нулевая копия").

Пожалуйста, дайте мне знать, выглядит ли это достаточно хорошо и улучшит ли этот ответ.

Другие вопросы по тегам