Как заставить Spark, Python и MongoDB работать вместе
Я испытываю трудности при правильном соединении этих компонентов. У меня установлена и успешно работает Spark, я могу запускать задания локально, автономно, а также через YARN. Я следовал советам (насколько мне известно) здесь и здесь
Я работаю над Ubuntu, и у меня есть разные версии компонентов
- Spark Spark -1.5.1-bin-hadoop2.6
- Hadoop hadoop-2.6.1
- Монго 2.6.10
- Разъем Mongo-Hadoop, клонированный с https://github.com/mongodb/mongo-hadoop.git
- Python 2.7.10
У меня были некоторые трудности с выполнением различных шагов, таких как, какие банки добавить к какому пути, так что я добавил
- в
/usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce
я добавилmongo-hadoop-core-1.5.0-SNAPSHOT.jar
- следующие переменные среды
export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
export PATH=$PATH:$HADOOP_HOME/bin
export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
export PATH=$PATH:$SPARK_HOME/bin
Моя программа на Python является базовой
from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()
def main():
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
rdd = sc.mongoRDD(
'mongodb://username:password@localhost:27017/mydb.mycollection')
if __name__ == '__main__':
main()
Я запускаю его с помощью команды
$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py
и я получаю следующий вывод в результате
Traceback (most recent call last):
File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
main()
File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
return self.mongoPairRDD(connection_string, config).values()
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
_ensure_pickles(self)
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
orig_tb)
py4j.protocol.Py4JError
Согласно здесь
Это исключение возникает, когда возникает исключение в клиентском коде Java. Например, если вы попытаетесь извлечь элемент из пустого стека. Экземпляр сгенерированного исключения Java хранится в элементе java_exception.
Глядя на исходный код для pymongo_spark.py
и строка выбрасывает ошибку, это говорит
"Ошибка связи с JVM. Является ли банка MongoDB Spark на CLASSPATH в Spark?"
Поэтому в ответ я постарался убедиться, что правильные банки пропущены, но, возможно, я все делаю неправильно, см. Ниже.
$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py
Я импортировал pymongo
к той же самой программе на Python, чтобы убедиться, что я могу по крайней мере получить доступ к MongoDB, используя это, и я могу.
Я знаю, что здесь довольно много движущихся частей, поэтому, если я смогу предоставить более полезную информацию, пожалуйста, дайте мне знать.
4 ответа
Обновления:
2016-07-04
С момента последнего обновления MongoDB Spark Connector достаточно повзрослел. Он предоставляет современные двоичные файлы и API на основе источников данных, но использует SparkConf
конфигурация, поэтому она субъективно менее гибкая, чем Stratio / Spark-MongoDB.
2016-03-30
С момента первоначального ответа я нашел два разных способа подключения к MongoDB от Spark:
Хотя первый кажется относительно незрелым, последний выглядит намного лучше, чем коннектор Mongo-Hadoop, и предоставляет Spark SQL API.
# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
.format("com.stratio.datasource.mongodb")
.options(host="mongo:27017", database="foo", collection="bar")
.load())
df.show()
## +---+----+--------------------+
## | x| y| _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+
Кажется, он намного стабильнее, чем mongo-hadoop-spark
, поддерживает предикат pushdown без статической конфигурации и просто работает.
Оригинальный ответ:
Действительно, здесь довольно много движущихся частей. Я попытался сделать его немного более управляемым, создав простой образ Docker, который примерно соответствует описанной конфигурации (хотя для краткости я опустил библиотеки Hadoop). Вы можете найти полный источник наGitHub
( DOI 10.5281 / zenodo.47882) и создайте его с нуля:
git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .
или загрузите изображение, которое я отправил в Docker Hub, чтобы вы могли просто docker pull zero323/mongo-spark
):
Начальные изображения:
docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash
Запустить прохождение оболочки PySpark --jars
а также --driver-class-path
:
pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}
И, наконец, посмотрим, как это работает:
import pymongo
import pymongo_spark
mongo_url = 'mongodb://mongo:27017/'
client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
{"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()
pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
.map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()
## [(1.0, -1.0), (0.0, 4.0)]
Обратите внимание, что mongo-hadoop закрывает соединение после первого действия. Так к примеру rdd.count()
после сбора скинут исключение.
Основываясь на разных проблемах, с которыми я столкнулся при создании этого образа, я склонен полагать, что прохождение mongo-hadoop-1.5.0-SNAPSHOT.jar
а также mongo-hadoop-spark-1.5.0-SNAPSHOT.jar
как для --jars
а также --driver-class-path
это единственное жесткое требование.
Примечания:
- Это изображение свободно основано на https://github.com/jaceklaskowski/docker-spark, поэтому, пожалуйста, не забудьте послать хорошую карму на Jacek Laskowski, если это поможет.
- Если не требуется версия для разработки, включающая новый API, используйте
--packages
скорее всего, лучший вариант.
Можете ли вы попробовать использовать --package
вариант вместо --jars ...
в вашей команде spark-submit:
spark-submit --packages org.mongodb.mongo-hadoop:mongo-hadoop-core:1.3.1,org.mongodb:mongo-java-driver:3.1.0 [REST OF YOUR OPTIONS]
Некоторые из этих jar-файлов не являются Uber-jar-файлами и нуждаются в большем количестве зависимостей для загрузки, прежде чем они смогут работать.
У меня была такая же проблема вчера. Был в состоянии исправить это, поместив mongo-java-driver.jar
в $HADOOP_HOME/lib
а также mongo-hadoop-core.jar
а также mongo-hadoop-spark.jar
в $HADOOP_HOME/spark/classpath/emr
(Или любая другая папка, которая находится в $SPARK_CLASSPATH
).
Дайте мне знать, если это поможет.
Удачи!
@ смотри https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
from pyspark import SparkContext, SparkConf
import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()
def main():
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
# Create an RDD backed by the MongoDB collection.
# This RDD *does not* contain key/value pairs, just documents.
# If you want key/value pairs, use the mongoPairRDD method instead.
rdd = sc.mongoRDD('mongodb://localhost:27017/db.collection')
# Save this RDD back to MongoDB as a different collection.
rdd.saveToMongoDB('mongodb://localhost:27017/db.other.collection')
# You can also read and write BSON:
bson_rdd = sc.BSONFileRDD('/path/to/file.bson')
bson_rdd.saveToBSON('/path/to/bson/output')
if __name__ == '__main__':
main()