Использовать внешнюю библиотеку в задании pyspark в кластере Spark от google-dataproc
У меня есть спарк кластер, который я создал через Google DataProc. Я хочу использовать библиотеку csv из блоков данных (см. https://github.com/databricks/spark-csv). Итак, я сначала проверил это так:
Я начал сессию ssh с главным узлом моего кластера, а затем ввел:
pyspark --packages com.databricks:spark-csv_2.11:1.2.0
Затем он запустил оболочку pyspark, в которую я ввел:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('gs:/xxxx/foo.csv')
df.show()
И это сработало.
Мой следующий шаг - запустить эту работу с моей основной машины с помощью команды:
gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> my_job.py
Но здесь это не работает, и я получаю ошибку. Я думаю, потому что я не дал --packages com.databricks:spark-csv_2.11:1.2.0
в качестве аргумента, но я попробовал 10 разных способов дать это, и мне не удалось.
Мой вопрос:
- была ли установлена библиотека данных csv после того, как я набрал
pyspark --packages com.databricks:spark-csv_2.11:1.2.0
- могу ли я написать строку в моем
job.py
для того, чтобы импортировать это? - или какие параметры я должен дать моей команде gcloud, чтобы импортировать или установить?
2 ответа
Короткий ответ
Есть причуды в упорядочении аргументов, где --packages
не принимается spark-submit
если это произойдет после my_job.py
аргумент. Чтобы обойти это, вы можете сделать следующее при отправке из CLI Dataproc:
gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> \
--properties spark.jars.packages=com.databricks:spark-csv_2.11:1.2.0 my_job.py
В основном, просто добавьте --properties spark.jars.packages=com.databricks:spark-csv_2.11:1.2.0
перед .py
файл в вашей команде.
Длинный ответ
Таким образом, это на самом деле другая проблема, чем известное отсутствие поддержки --jars
в gcloud beta dataproc jobs submit pyspark
; кажется, что без Dataproc явно признавая --packages
как особый spark-submit
флаг уровня, он пытается передать его после аргументов приложения, так что spark-submit позволяет --packages
указывать аргумент приложения, а не анализировать его как параметр уровня передачи. Действительно, в сеансе SSH следующее не работает:
# Doesn't work if job.py depends on that package.
spark-submit job.py --packages com.databricks:spark-csv_2.11:1.2.0
Но переключение порядка аргументов снова работает, хотя в pyspark
В случае, оба заказа работают:
# Works with dependencies on that package.
spark-submit --packages com.databricks:spark-csv_2.11:1.2.0 job.py
pyspark job.py --packages com.databricks:spark-csv_2.11:1.2.0
pyspark --packages com.databricks:spark-csv_2.11:1.2.0 job.py
Так что даже если spark-submit job.py
должен быть заменой всего, что раньше называлось pyspark job.py
, разница в порядке разбора для таких вещей, как --packages
означает, что на самом деле это не 100% совместимая миграция. Это может быть чем-то, что можно отследить на стороне Spark.
Во всяком случае, к счастью, есть обходной путь, так как --packages
это просто еще один псевдоним для свойства Spark spark.jars.packages
и CLI Dataproc поддерживает свойства просто отлично. Так что вы можете просто сделать следующее:
gcloud beta dataproc jobs submit pyspark --cluster <my-dataproc-cluster> \
--properties spark.jars.packages=com.databricks:spark-csv_2.11:1.2.0 my_job.py
Обратите внимание, что --properties
должен прийти раньше my_job.py
в противном случае он отправляется как аргумент приложения, а не как флаг конфигурации. Надеюсь, что это работает для вас! Обратите внимание, что эквивалент в сеансе SSH будет spark-submit --packages com.databricks:spark-csv_2.11:1.2.0 job.py
,
В дополнение к @Dennis.
Обратите внимание, что если вам нужно загрузить несколько внешних пакетов, вам нужно указать собственный escape-символ, например:
--properties ^#^spark.jars.packages=org.elasticsearch:elasticsearch-spark_2.10:2.3.2,com.databricks:spark-avro_2.10:2.0.1
Обратите внимание на ^#^ прямо перед списком пакетов. Увидеть gcloud topic escaping
Больше подробностей.