spark load python package如何依赖外部库?

How does spark load python package depends on the external libarary?

我打算用udf批量屏蔽数据。 udf调用ecc和aes对数据进行mask,具体封装为:

我收到以下错误

Driver stacktrace:
22/03/21 11:30:52 INFO DAGScheduler: Job 1 failed: showString at NativeMethodAccessorImpl.java:0, took 1.766196 s
Traceback (most recent call last):
  File "/home/hadoop/pyspark-dm.py", line 495, in <module>
    df_result.show()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 485, in show
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 588, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1647852036838_0018/container_1647852036838_0018_01_000004/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'ecies'

我通过存档加载了环境

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
# spark session initialization 
spark =SparkSession.builder.config('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2').config('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse').config('spark.sql.catalogImplementation', 'hive').config("spark.archives", "pyspark_venv.tar.gz#environment").getOrCreate()

我通过venv-pack打包依赖库,通过spark-submit上传

22/03/21 11:44:36 INFO SparkContext: Unpacking an archive pyspark_venv.tar.gz#environment from /mnt/tmp/spark-060999fd-4410-405d-8d15-1b832d09f86c/pyspark_venv.tar.gz to /mnt/tmp/spark-dc9e1f8b-5d91-4ccf-8f20-d85ed72e9eca/userFiles-1c03e075-1fb2-4ffd-a527-bb4d650e4df8/environment

当我在本地模式下执行 pyspark 脚本时,它运行良好。

建立档案

修改源码

conf = SparkConf()
conf.setExecutorEnv('PYSPARK_PYTHON', './environment/bin/python')
conf.set('spark.sql.hive.metastore.sharedPrefixes', 'com.amazonaws.services.dynamodbv2')
conf.set('spark.sql.warehouse.dir', 'hdfs:///user/spark/warehouse')
conf.set('spark.sql.catalogImplementation', 'hive')
conf.set("spark.archives", "pyspark_venv.tar.gz#environment")

提交作业

spark-submit pyspark-dm.py --archives pyspark_env.tar.gz