如何在 Apache Spark (pyspark) 中使用自定义 类?
How to use custom classes with Apache Spark (pyspark)?
我写了一个 class 在 python 中实现了一个 classifier。我想使用 Apache Spark 使用这个 classifier 并行化 class 大量数据点的化。
- 我在一个有 10 个从节点的集群上使用 Amazon EC2 进行设置,基于一个带有 python 的 Anaconda 发行版的 ami。 ami 让我可以远程使用 IPython Notebook。
- 我已经在文件夹 /root/anaconda/lib/python2.7/ 中的主文件调用 BoTree.py 中定义了 class BoTree,这是我所有 python 模块所在的文件夹是
- 我已经检查过我可以导入和使用 BoTree.py 当 运行 命令行从 master 发出火花时(我只需要从编写 import BoTree 和我的 class BoTree 开始可用
- 我使用 spark 的 /root/spark-ec2/copy-dir.sh 脚本在我的集群中复制 /python2.7/ 目录。
- 我已经通过 ssh 连接到其中一个从站并在那里尝试 运行 ipython,并且能够导入 BoTree,所以我认为该模块已成功发送到集群中(我还可以在 .../python2.7/ 文件夹中看到 BoTree.py 文件)
- 在我检查过的主机上,我可以使用 cPickle 对 BoTree 实例进行 pickle 和 unpickle,我知道它是 pyspark 的序列化程序。
但是,当我执行以下操作时:
import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()
Spark 失败并出现错误(我认为只是相关的部分):
File "/root/spark/python/pyspark/worker.py", line 90, in main
command = pickleSer.loads(command.value)
File "/root/spark/python/pyspark/serializers.py", line 405, in loads
return cPickle.loads(obj)
ImportError: No module named BoroughTree
谁能帮帮我?有点绝望...
谢谢
可能最简单的解决方案是在创建 SparkContext
时使用 pyFiles
参数
from pyspark import SparkContext
sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py'])
放置在那里的每个文件都将发送给工作人员并添加到 PYTHONPATH
。
如果您在交互模式下工作,则必须在创建新上下文之前使用 sc.stop()
停止现有上下文。
还要确保 Spark worker 实际上使用的是 Anaconda 发行版,而不是默认的 Python 解释器。根据你的描述,很有可能是这个问题。要设置 PYSPARK_PYTHON
,您可以使用 conf/spark-env.sh
个文件。
旁注将文件复制到 lib
是一个相当混乱的解决方案。如果您想避免使用 pyFiles
推送文件,我建议您创建普通 Python 包或 Conda 包并进行正确安装。通过这种方式,您可以轻松跟踪已安装的内容、删除不必要的软件包并避免一些难以调试的问题。
获取 SparkContext 后,还可以使用 addPyFile
随后向每个 worker 发送一个模块。
sc.addPyFile('/path/to/BoTree.py')
我写了一个 class 在 python 中实现了一个 classifier。我想使用 Apache Spark 使用这个 classifier 并行化 class 大量数据点的化。
- 我在一个有 10 个从节点的集群上使用 Amazon EC2 进行设置,基于一个带有 python 的 Anaconda 发行版的 ami。 ami 让我可以远程使用 IPython Notebook。
- 我已经在文件夹 /root/anaconda/lib/python2.7/ 中的主文件调用 BoTree.py 中定义了 class BoTree,这是我所有 python 模块所在的文件夹是
- 我已经检查过我可以导入和使用 BoTree.py 当 运行 命令行从 master 发出火花时(我只需要从编写 import BoTree 和我的 class BoTree 开始可用
- 我使用 spark 的 /root/spark-ec2/copy-dir.sh 脚本在我的集群中复制 /python2.7/ 目录。
- 我已经通过 ssh 连接到其中一个从站并在那里尝试 运行 ipython,并且能够导入 BoTree,所以我认为该模块已成功发送到集群中(我还可以在 .../python2.7/ 文件夹中看到 BoTree.py 文件)
- 在我检查过的主机上,我可以使用 cPickle 对 BoTree 实例进行 pickle 和 unpickle,我知道它是 pyspark 的序列化程序。
但是,当我执行以下操作时:
import BoTree
bo_tree = BoTree.train(data)
rdd = sc.parallelize(keyed_training_points) #create rdd of 10 (integer, (float, float) tuples
rdd = rdd.mapValues(lambda point, bt = bo_tree: bt.classify(point[0], point[1]))
out = rdd.collect()
Spark 失败并出现错误(我认为只是相关的部分):
File "/root/spark/python/pyspark/worker.py", line 90, in main
command = pickleSer.loads(command.value)
File "/root/spark/python/pyspark/serializers.py", line 405, in loads
return cPickle.loads(obj)
ImportError: No module named BoroughTree
谁能帮帮我?有点绝望...
谢谢
可能最简单的解决方案是在创建 SparkContext
pyFiles
参数
from pyspark import SparkContext
sc = SparkContext(master, app_name, pyFiles=['/path/to/BoTree.py'])
放置在那里的每个文件都将发送给工作人员并添加到 PYTHONPATH
。
如果您在交互模式下工作,则必须在创建新上下文之前使用 sc.stop()
停止现有上下文。
还要确保 Spark worker 实际上使用的是 Anaconda 发行版,而不是默认的 Python 解释器。根据你的描述,很有可能是这个问题。要设置 PYSPARK_PYTHON
,您可以使用 conf/spark-env.sh
个文件。
旁注将文件复制到 lib
是一个相当混乱的解决方案。如果您想避免使用 pyFiles
推送文件,我建议您创建普通 Python 包或 Conda 包并进行正确安装。通过这种方式,您可以轻松跟踪已安装的内容、删除不必要的软件包并避免一些难以调试的问题。
获取 SparkContext 后,还可以使用 addPyFile
随后向每个 worker 发送一个模块。
sc.addPyFile('/path/to/BoTree.py')