如何在 pyspark 中使用外部(自定义)包?
How to use external (custom) package in pyspark?
我正在尝试复制此处给出的灵魂 https://www.cloudera.com/documentation/enterprise/5-7-x/topics/spark_python.html
在 pypspark 中导入外部包。但它失败了。
我的代码:
spark_distro.py
from pyspark import SparkContext, SparkConf
def import_my_special_package(x):
from external_package import external
return external.fun(x)
conf = SparkConf()
sc = SparkContext()
int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_my_special_package(x)).collect()
external_package.py
class external:
def __init__(self,in):
self.in = in
def fun(self,in):
return self.in*3
spark 提交命令:
spark-submit \
--master yarn \
/path to script/spark_distro.py \
--py-files /path to script/external_package.py \
1000
实际错误:
Actual:
vs = list(itertools.islice(iterator, batch))
File "/home/gsurapur/pyspark_examples/spark_distro.py", line 13, in <lambda>
File "/home/gsurapur/pyspark_examples/spark_distro.py", line 6, in import_my_special_package
ImportError: No module named external_package
预期输出:
[3,6,9,12]
我也尝试了 sc.addPyFile
选项,但它因同样的问题而失败。
我知道,事后看来,这听起来很愚蠢,但 spark-submit
的参数顺序通常不可互换:所有与 Spark 相关的参数,包括 --py-file
,必须是 之前要执行的脚本:
# your case:
spark-submit --master yarn-client /home/ctsats/scripts/SO/spark_distro.py --py-files /home/ctsats/scripts/SO/external_package.py
[...]
ImportError: No module named external_package
# correct usage:
spark-submit --master yarn-client --py-files /home/ctsats/scripts/SO/external_package.py /home/ctsats/scripts/SO/spark_distro.py
[...]
[3, 6, 9, 12]
测试你的脚本修改如下:
spark_distro.py
from pyspark import SparkContext, SparkConf
def import_my_special_package(x):
from external_package import external
return external(x)
conf = SparkConf()
sc = SparkContext()
int_rdd = sc.parallelize([1, 2, 3, 4])
print int_rdd.map(lambda x: import_my_special_package(x)).collect()
external_package.py
def external(x):
return x*3
修改可以说没有改变问题的本质...
这是关于 addPyFile
的情况:
spark_distro2.py
from pyspark import SparkContext, SparkConf
def import_my_special_package(x):
from external_package import external
return external(x)
conf = SparkConf()
sc = SparkContext()
sc.addPyFile("/home/ctsats/scripts/SO/external_package.py") # added
int_rdd = sc.parallelize([1, 2, 3, 4])
print int_rdd.map(lambda x: import_my_special_package(x)).collect()
测试:
spark-submit --master yarn-client /home/ctsats/scripts/SO/spark_distro2.py
[...]
[3, 6, 9, 12]
我正在尝试复制此处给出的灵魂 https://www.cloudera.com/documentation/enterprise/5-7-x/topics/spark_python.html 在 pypspark 中导入外部包。但它失败了。
我的代码:
spark_distro.py
from pyspark import SparkContext, SparkConf
def import_my_special_package(x):
from external_package import external
return external.fun(x)
conf = SparkConf()
sc = SparkContext()
int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_my_special_package(x)).collect()
external_package.py
class external:
def __init__(self,in):
self.in = in
def fun(self,in):
return self.in*3
spark 提交命令:
spark-submit \
--master yarn \
/path to script/spark_distro.py \
--py-files /path to script/external_package.py \
1000
实际错误:
Actual:
vs = list(itertools.islice(iterator, batch))
File "/home/gsurapur/pyspark_examples/spark_distro.py", line 13, in <lambda>
File "/home/gsurapur/pyspark_examples/spark_distro.py", line 6, in import_my_special_package
ImportError: No module named external_package
预期输出:
[3,6,9,12]
我也尝试了 sc.addPyFile
选项,但它因同样的问题而失败。
我知道,事后看来,这听起来很愚蠢,但 spark-submit
的参数顺序通常不可互换:所有与 Spark 相关的参数,包括 --py-file
,必须是 之前要执行的脚本:
# your case:
spark-submit --master yarn-client /home/ctsats/scripts/SO/spark_distro.py --py-files /home/ctsats/scripts/SO/external_package.py
[...]
ImportError: No module named external_package
# correct usage:
spark-submit --master yarn-client --py-files /home/ctsats/scripts/SO/external_package.py /home/ctsats/scripts/SO/spark_distro.py
[...]
[3, 6, 9, 12]
测试你的脚本修改如下:
spark_distro.py
from pyspark import SparkContext, SparkConf
def import_my_special_package(x):
from external_package import external
return external(x)
conf = SparkConf()
sc = SparkContext()
int_rdd = sc.parallelize([1, 2, 3, 4])
print int_rdd.map(lambda x: import_my_special_package(x)).collect()
external_package.py
def external(x):
return x*3
修改可以说没有改变问题的本质...
这是关于 addPyFile
的情况:
spark_distro2.py
from pyspark import SparkContext, SparkConf
def import_my_special_package(x):
from external_package import external
return external(x)
conf = SparkConf()
sc = SparkContext()
sc.addPyFile("/home/ctsats/scripts/SO/external_package.py") # added
int_rdd = sc.parallelize([1, 2, 3, 4])
print int_rdd.map(lambda x: import_my_special_package(x)).collect()
测试:
spark-submit --master yarn-client /home/ctsats/scripts/SO/spark_distro2.py
[...]
[3, 6, 9, 12]