spark load python package如何依赖外部库?
How does spark load python package depends on the external libarary?
我打算用udf批量屏蔽数据。 udf调用ecc和aes对数据进行mask,具体封装为:
- 密码学
- eciespy
我收到以下错误
Driver stacktrace:
22/03/21 11:30:52 INFO DAGScheduler: Job 1 failed: showString at NativeMethodAccessorImpl.java:0, took 1.766196 s
Traceback (most recent call last):
File "/home/hadoop/pyspark-dm.py", line 495, in <module>
df_result.show()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 485, in show
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 588, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'ecies'
我通过存档加载了环境
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
# spark session initialization
spark =SparkSession.builder.config('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2').config('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse').config('spark.sql.catalogImplementation', 'hive').config("spark.archives", "pyspark_venv.tar.gz#environment").getOrCreate()
我通过venv-pack打包依赖库,通过spark-submit上传
22/03/21 11:44:36 INFO SparkContext: Unpacking an archive pyspark_venv.tar.gz#environment from /mnt/tmp/spark-060999fd-4410-405d-8d15-1b832d09f86c/pyspark_venv.tar.gz to /mnt/tmp/spark-dc9e1f8b-5d91-4ccf-8f20-d85ed72e9eca/userFiles-1c03e075-1fb2-4ffd-a527-bb4d650e4df8/environment
当我在本地模式下执行 pyspark 脚本时,它运行良好。
建立档案
- 首先,详细阅读[文档] (https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html#using-virtualenv)。
- 其次,创建虚拟环境。
- 三、安装相关包
- 最后通过venv-pack打包虚拟环境。
修改源码
conf = SparkConf()
conf.setExecutorEnv('PYSPARK_PYTHON', './environment/bin/python')
conf.set('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2')
conf.set('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse')
conf.set('spark.sql.catalogImplementation', 'hive')
conf.set("spark.archives", "pyspark_venv.tar.gz#environment")
提交作业
spark-submit pyspark-dm.py --archives pyspark_env.tar.gz
我打算用udf批量屏蔽数据。 udf调用ecc和aes对数据进行mask,具体封装为:
- 密码学
- eciespy
我收到以下错误
Driver stacktrace:
22/03/21 11:30:52 INFO DAGScheduler: Job 1 failed: showString at NativeMethodAccessorImpl.java:0, took 1.766196 s
Traceback (most recent call last):
File "/home/hadoop/pyspark-dm.py", line 495, in <module>
df_result.show()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 485, in show
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 588, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'ecies'
我通过存档加载了环境
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
# spark session initialization
spark =SparkSession.builder.config('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2').config('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse').config('spark.sql.catalogImplementation', 'hive').config("spark.archives", "pyspark_venv.tar.gz#environment").getOrCreate()
我通过venv-pack打包依赖库,通过spark-submit上传
22/03/21 11:44:36 INFO SparkContext: Unpacking an archive pyspark_venv.tar.gz#environment from /mnt/tmp/spark-060999fd-4410-405d-8d15-1b832d09f86c/pyspark_venv.tar.gz to /mnt/tmp/spark-dc9e1f8b-5d91-4ccf-8f20-d85ed72e9eca/userFiles-1c03e075-1fb2-4ffd-a527-bb4d650e4df8/environment
当我在本地模式下执行 pyspark 脚本时,它运行良好。
建立档案
- 首先,详细阅读[文档] (https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html#using-virtualenv)。
- 其次,创建虚拟环境。
- 三、安装相关包
- 最后通过venv-pack打包虚拟环境。
修改源码
conf = SparkConf()
conf.setExecutorEnv('PYSPARK_PYTHON', './environment/bin/python')
conf.set('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2')
conf.set('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse')
conf.set('spark.sql.catalogImplementation', 'hive')
conf.set("spark.archives", "pyspark_venv.tar.gz#environment")
提交作业
spark-submit pyspark-dm.py --archives pyspark_env.tar.gz