Использовать внешнюю библиотеку в задании pyspark в кластере Spark от google-dataproc

У меня есть спарк кластер, который я создал через Google DataProc. Я хочу использовать библиотеку csv из блоков данных (см. https://github.com/databricks/spark-csv). Итак, я сначала проверил это так:

Я начал сессию ssh с главным узлом моего кластера, а затем ввел:

pyspark --packages com.databricks:spark-csv_2.11:1.2.0

Затем он запустил оболочку pyspark, в которую я ввел:

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('gs:/xxxx/foo.csv')
df.show()

И это сработало.

Мой следующий шаг - запустить эту работу с моей основной машины с помощью команды:

gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> my_job.py

Но здесь это не работает, и я получаю ошибку. Я думаю, потому что я не дал --packages com.databricks:spark-csv_2.11:1.2.0 в качестве аргумента, но я попробовал 10 разных способов дать это, и мне не удалось.

Мой вопрос:

  1. была ли установлена ​​библиотека данных csv после того, как я набрал pyspark --packages com.databricks:spark-csv_2.11:1.2.0
  2. могу ли я написать строку в моем job.py для того, чтобы импортировать это?
  3. или какие параметры я должен дать моей команде gcloud, чтобы импортировать или установить?

2 ответа

Решение

Короткий ответ

Есть причуды в упорядочении аргументов, где --packages не принимается spark-submit если это произойдет после my_job.py аргумент. Чтобы обойти это, вы можете сделать следующее при отправке из CLI Dataproc:

gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> \
    --properties spark.jars.packages=com.databricks:spark-csv_2.11:1.2.0 my_job.py

В основном, просто добавьте --properties spark.jars.packages=com.databricks:spark-csv_2.11:1.2.0 перед .py файл в вашей команде.

Длинный ответ

Таким образом, это на самом деле другая проблема, чем известное отсутствие поддержки --jars в gcloud beta dataproc jobs submit pyspark; кажется, что без Dataproc явно признавая --packages как особый spark-submitфлаг уровня, он пытается передать его после аргументов приложения, так что spark-submit позволяет --packages указывать аргумент приложения, а не анализировать его как параметр уровня передачи. Действительно, в сеансе SSH следующее не работает:

# Doesn't work if job.py depends on that package.
spark-submit job.py --packages com.databricks:spark-csv_2.11:1.2.0

Но переключение порядка аргументов снова работает, хотя в pyspark В случае, оба заказа работают:

# Works with dependencies on that package.
spark-submit --packages com.databricks:spark-csv_2.11:1.2.0 job.py
pyspark job.py --packages com.databricks:spark-csv_2.11:1.2.0
pyspark --packages com.databricks:spark-csv_2.11:1.2.0 job.py

Так что даже если spark-submit job.py должен быть заменой всего, что раньше называлось pyspark job.py, разница в порядке разбора для таких вещей, как --packages означает, что на самом деле это не 100% совместимая миграция. Это может быть чем-то, что можно отследить на стороне Spark.

Во всяком случае, к счастью, есть обходной путь, так как --packages это просто еще один псевдоним для свойства Spark spark.jars.packagesи CLI Dataproc поддерживает свойства просто отлично. Так что вы можете просто сделать следующее:

gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> \
    --properties spark.jars.packages=com.databricks:spark-csv_2.11:1.2.0 my_job.py

Обратите внимание, что --properties должен прийти раньше my_job.pyв противном случае он отправляется как аргумент приложения, а не как флаг конфигурации. Надеюсь, что это работает для вас! Обратите внимание, что эквивалент в сеансе SSH будет spark-submit --packages com.databricks:spark-csv_2.11:1.2.0 job.py,

В дополнение к @Dennis.

Обратите внимание, что если вам нужно загрузить несколько внешних пакетов, вам нужно указать собственный escape-символ, например:

--properties ^#^spark.jars.packages=org.elasticsearch:elasticsearch-spark_2.10:2.3.2,com.data‌​bricks:spark-avro_2.10:2.0.1

Обратите внимание на ^#^ прямо перед списком пакетов. Увидеть gcloud topic escaping Больше подробностей.

Другие вопросы по тегам