Py4Java: ImportError: No module named numpy when running Python shell for Apache Spark

Py4Java: ImportError: No module named numpy when running Python shell for Apache Spark

我正在尝试关注此 Apache Spark talk

中的实时编码

这是我的 IPython 笔记本,直到遇到错误为止:

所以安装了 numpy:

~ $ pip install numpy
Requirement already satisfied (use --upgrade to upgrade): 
numpy in ./anaconda/lib/python2.7/site-packages
Cleaning up...

然而,当我 运行 依赖于 numpymodel = KMeans.train(data, k=5) 命令时,Spark 使用的 Py4Java 库抛出此错误。我如何告诉 Spark 文件夹中的这个 py4j/protocol.py Python 脚本从我在 ./anaconda/lib/python2.7/site-packages 的现有 numpy 安装导入?

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-15-2eb94be11344> in <module>()
----> 1 model = KMeans.train(data, k=5)

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/mllib/clustering.py in train(cls, rdd, k, maxIterations, runs, initializationMode)
     82         """Train a k-means clustering model."""
     83         model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
---> 84                               runs, initializationMode)
     85         centers = callJavaFunc(rdd.context, model.clusterCenters)
     86         return KMeansModel([c.toArray() for c in centers])

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/mllib/common.py in callMLlibFunc(name, *args)
    120     sc = SparkContext._active_spark_context
    121     api = getattr(sc._jvm.PythonMLLibAPI(), name)
--> 122     return callJavaFunc(sc, api, *args)
    123 
    124 

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)
    113     """ Call Java Function """
    114     args = [_py2java(sc, a) for a in args]
--> 115     return _java2py(sc, func(*args))
    116 
    117 

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o18.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/worker.py", line 90, in main
    command = pickleSer._read_with_length(infile)
  File "/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py", line 151, in _read_with_length
    return self.loads(obj)
  File "/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/serializers.py", line 396, in loads
    return cPickle.loads(obj)
  File "/Users/m/workspace/spark-1.2.0-bin-hadoop2.4/python/pyspark/mllib/__init__.py", line 24, in <module>
    import numpy
ImportError: No module named numpy

    at org.apache.spark.api.python.PythonRDD$$anon.read(PythonRDD.scala:137)
    at org.apache.spark.api.python.PythonRDD$$anon.<init>(PythonRDD.scala:174)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

同意,由于某种原因,iPython 使用的 Python 版本似乎没有 numpy。你是如何启动 ipython / pyspark 的?推荐的方法是调用:

$SPARK_HOME/bin/pyspark

其中 $SPARK_HOME 是您的 Spark 安装,在设置了以下环境变量后(通过输入您的 shell,或添加到您的 bash 配置文件):

export IPYTHON=1
export IPYTHON_OPTS="notebook" 

如有必要,您可以通过设置变量 PYSPARK_PYTHON.

来指定 Python 使用的 python 可执行文件

另一种方法是在没有 Spark 的情况下启动 iPython,然后手动导入 Spark 上下文,如此处所述:http://www.abisen.com/spark-from-ipython-notebook.html。如果您遇到版本问题,这可能会有用。