有没有办法强制 spark worker 使用分布式 numpy 版本而不是安装在他们身上的版本?
Is there a way to force spark workers to use a distributed numpy version instead of the one installed on them?
情况如下:在使用 spark 2.3 的企业集群上工作,我想 运行 pandas_udf 需要 pyarrow 需要 numpy 0.14 (AFAIK)。
能够分发 pyarrow(我认为,无法 100% 验证这一点):
pyspark.sql.SparkSession.builder.appName("pandas_udf_poc").config("spark.executor.instances","2")\
.config("spark.executor.memory","8g")\
.config("spark.driver.memory","8g")\
.config("spark.driver.maxResultSize","8g")\
.config("py-files", "pyarrow_depnd.zip")\
.getOrCreate()
spark.sparkContext.addPyFile("pyarrow_depnd.zip")
zip 是 pip 安装到目录并压缩的结果。
但 pyarrow 不与节点 numpy 0.13 一起玩,我想我可以尝试向所有节点分发完整的环境,但我的问题是,有没有办法避免这种情况并使节点使用不同的numpy(已在 pyarrow zip 中分发)
谢谢
好吧,最后,不必使用虚拟环境,但无法避免向所有节点分发 python(包含所需的依赖项)的完整副本。
首先构建了 python 的完整副本(确实使用了 conda env,但您可能会使用其他方式):
conda create --prefix /home/me/env_conda_for_pyarrow
source activate /home/me/env_conda_for_pyarrow
conda install numpy
conda install pyarrow
在这种特定情况下,必须在安装之前打开 conda-forge 频道,以获取最新版本。
其次,压缩分发:
zip -r env_conda_for_pyarrow.zip env_conda_for_pyarrow
然后使用 archives 分发 zip 和 env var PYSPARK_PYTHON 指向它:
import os, sys
os.environ['PYSPARK_PYTHON']="dist_python/env_conda_for_pyarrow/bin/python"
import pyspark
spark = \
pyspark.sql.SparkSession.builder.appName("pysaprk_python")\
.config("spark.yarn.dist.archives", "env_conda_for_pyarrow.zip#dist_python")\
.getOrCreate()
print spark.version, spark.sparkContext.master
就这样,完成了。以下是我用于测试的一些脚本:
def list_nodes_dir(x): # hack to see workers file dirs
import os
return os.listdir('dist_python')
spark.sparkContext.parallelize(range(1), 1).map(list_nodes_dir).collect()
def npv(x): # hack to see workers numpy version
import numpy as np
return np.__version__
set(spark.sparkContext.parallelize(range(10), 10).map(npv).collect())
# spark documents example
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType, StringType
slen = pandas_udf(lambda s: s.str.len(), IntegerType())
@pandas_udf(StringType())
def to_upper(s):
return s.str.upper()
@pandas_udf("integer", PandasUDFType.SCALAR)
def add_one(x):
return x + 1
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"),
add_one("age")).show()
情况如下:在使用 spark 2.3 的企业集群上工作,我想 运行 pandas_udf 需要 pyarrow 需要 numpy 0.14 (AFAIK)。 能够分发 pyarrow(我认为,无法 100% 验证这一点):
pyspark.sql.SparkSession.builder.appName("pandas_udf_poc").config("spark.executor.instances","2")\
.config("spark.executor.memory","8g")\
.config("spark.driver.memory","8g")\
.config("spark.driver.maxResultSize","8g")\
.config("py-files", "pyarrow_depnd.zip")\
.getOrCreate()
spark.sparkContext.addPyFile("pyarrow_depnd.zip")
zip 是 pip 安装到目录并压缩的结果。
但 pyarrow 不与节点 numpy 0.13 一起玩,我想我可以尝试向所有节点分发完整的环境,但我的问题是,有没有办法避免这种情况并使节点使用不同的numpy(已在 pyarrow zip 中分发)
谢谢
好吧,最后,不必使用虚拟环境,但无法避免向所有节点分发 python(包含所需的依赖项)的完整副本。
首先构建了 python 的完整副本(确实使用了 conda env,但您可能会使用其他方式):
conda create --prefix /home/me/env_conda_for_pyarrow
source activate /home/me/env_conda_for_pyarrow
conda install numpy
conda install pyarrow
在这种特定情况下,必须在安装之前打开 conda-forge 频道,以获取最新版本。
其次,压缩分发:
zip -r env_conda_for_pyarrow.zip env_conda_for_pyarrow
然后使用 archives 分发 zip 和 env var PYSPARK_PYTHON 指向它:
import os, sys
os.environ['PYSPARK_PYTHON']="dist_python/env_conda_for_pyarrow/bin/python"
import pyspark
spark = \
pyspark.sql.SparkSession.builder.appName("pysaprk_python")\
.config("spark.yarn.dist.archives", "env_conda_for_pyarrow.zip#dist_python")\
.getOrCreate()
print spark.version, spark.sparkContext.master
就这样,完成了。以下是我用于测试的一些脚本:
def list_nodes_dir(x): # hack to see workers file dirs
import os
return os.listdir('dist_python')
spark.sparkContext.parallelize(range(1), 1).map(list_nodes_dir).collect()
def npv(x): # hack to see workers numpy version
import numpy as np
return np.__version__
set(spark.sparkContext.parallelize(range(10), 10).map(npv).collect())
# spark documents example
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType, StringType
slen = pandas_udf(lambda s: s.str.len(), IntegerType())
@pandas_udf(StringType())
def to_upper(s):
return s.str.upper()
@pandas_udf("integer", PandasUDFType.SCALAR)
def add_one(x):
return x + 1
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"),
add_one("age")).show()