Py4Java: ImportError: нет модуля с именем numpy при запуске оболочки Python для Apache Spark

Я пытаюсь следовать живому кодированию в этом выступлении Apache Spark

Вот мой блокнот IPython, вплоть до момента, когда я сталкиваюсь с ошибкой:

демо spark + lightning в ноутбуке IPython

так что numpy установлен:

~ $ pip install numpy
Requirement already satisfied (use --upgrade to upgrade): 
numpy in ./anaconda/lib/python2.7/site-packages
Cleaning up...

Тем не менее, когда я бегу model = KMeans.train(data, k=5) команда, которая зависит от numpy, Py4Java библиотека, которую использует Spark, выдает эту ошибку. Как я могу сказать это py4j/protocol.py Сценарий Python в папке Spark для импорта из моей существующей установки numpy ./anaconda/lib/python2.7/site-packages?

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-15-2eb94be11344> in <module>()
----> 1 model = KMeans.train(data, k=5)

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/mllib/clustering.py in train(cls, rdd, k, maxIterations, runs, initializationMode)
     82         """Train a k-means clustering model."""
     83         model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
---> 84                               runs, initializationMode)
     85         centers = callJavaFunc(rdd.context, model.clusterCenters)
     86         return KMeansModel([c.toArray() for c in centers])

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/mllib/common.py in callMLlibFunc(name, *args)
    120     sc = SparkContext._active_spark_context
    121     api = getattr(sc._jvm.PythonMLLibAPI(), name)
--> 122     return callJavaFunc(sc, api, *args)
    123 
    124 

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)
    113     """ Call Java Function """
    114     args = [_py2java(sc, a) for a in args]
--> 115     return _java2py(sc, func(*args))
    116 
    117 

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o18.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py", line 90, in main
    command = pickleSer._read_with_length(infile)
  File "/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py", line 151, in _read_with_length
    return self.loads(obj)
  File "/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py", line 396, in loads
    return cPickle.loads(obj)
  File "/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/mllib/__init__.py", line 24, in <module>
    import numpy
ImportError: No module named numpy

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

1 ответ

Решение

Согласитесь, похоже, по какой-то причине версия Python, используемая iPython, не имеет клочья. Как вы запустили ipython / pyspark? Рекомендуемый подход - позвонить:

$SPARK_HOME/bin/pyspark

где $SPARK_HOME - это ваша установка Spark, после установки следующих переменных среды (путем ввода в вашу оболочку или добавления в ваш профиль bash):

export IPYTHON=1
export IPYTHON_OPTS="notebook" 

При необходимости вы можете указать исполняемый файл python, который использует Python, установив переменную PYSPARK_PYTHON.

Альтернативный вариант - запустить iPython без Spark, а затем импортировать контекст Spark вручную, как описано здесь: http://www.abisen.com/spark-from-ipython-notebook.html. Это может быть полезно, если у вас проблемы с версией.

Другие вопросы по тегам