Pandas UDF 的 PySpark 环境设置
PySpark Environment Setup for Pandas UDF
-编辑-
这个简单的示例只显示了 3 条记录,但我需要对数十亿条记录执行此操作,因此我需要使用 Pandas UDF,而不是仅仅将 Spark DF 转换为 Pandas DF 和使用简单的应用。
输入数据
期望的输出
-结束编辑-
我一直在用头撞墙试图解决这个问题,我希望有人能帮助我解决这个问题。我正在尝试将 PySpark 数据框中的纬度/经度值转换为 Uber 的 H3 十六进制系统。这是对函数 h3.geo_to_h3(lat=lat, lng=lon, resolution=7)
的非常直接的使用。但是我的 PySpark 集群一直有问题。
我正在按照数据块文章 here 中的描述设置我的 PySpark 集群,使用以下命令:
conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas h3 numpy python=3.7 conda-pack
conda init --all
然后关闭并重新打开终端 window
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz
我在我的 jupyter notebook 中包含我在创建 spark 集群时创建的 tar.gz 文件,就像这样 spark = SparkSession.builder.master("yarn").appName("test").config("spark.yarn.dist.archives","<path>/pyspark_conda_env.tar.gz#environment").getOrCreate()
我的 pandas udf 设置如下,我能够在单节点 spark 集群上工作,但现在在具有多个工作节点的集群上遇到问题:
#create udf to convert lat lon to h3 hex
def convert_to_h3(lat : pd.Series, lon : pd.Series) -> pd.Series:
import h3 as h3
import numpy as np
if ((None in [lat, lon]) | (np.isnan(lat))):
return None
else:
return (h3.geo_to_h3(lat=lat, lng=lon, resolution=7))
@f.pandas_udf('string', f.PandasUDFType.SCALAR)
def udf_convert_to_h3(lat : pd.Series, lon : pd.Series) -> pd.Series:
import pandas as pd
import numpy as np
df = pd.DataFrame({'lat' : lat, 'lon' : lon})
df['h3_res7'] = df.apply(lambda x : convert_to_h3(x['lat'], x['lon']), axis = 1)
return df['h3_res7']
在使用 pandas udf 创建新列并尝试查看它之后:
trip_starts = trip_starts.withColumn('h3_res7', udf_convert_to_h3(f.col('latitude'), f.col('longitude')))
我收到以下错误:
21/07/15 20:05:22 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 139 for reason Container marked as failed: container_1626376534301_0015_01_000158 on host: ip-xx-xxx-xx-xxx.aws.com. Exit status: -100. Diagnostics: Container released on a *lost* node.
我不确定在这里做什么,因为我已经尝试将记录数量缩减到更易于管理的数量,但仍在 运行 解决这个问题。理想情况下,我想弄清楚如何使用 PySpark 环境,如 databricks 博客 post 中所述,我链接而不是 运行ning bootstrap 由于公司政策而启动集群时的脚本使 bootstrap 脚本更难 运行。
我最终通过将我的数据重新分区为更小的分区来解决这个问题,每个分区中的记录更少。这解决了我的问题。
-编辑-
这个简单的示例只显示了 3 条记录,但我需要对数十亿条记录执行此操作,因此我需要使用 Pandas UDF,而不是仅仅将 Spark DF 转换为 Pandas DF 和使用简单的应用。
输入数据
期望的输出
-结束编辑-
我一直在用头撞墙试图解决这个问题,我希望有人能帮助我解决这个问题。我正在尝试将 PySpark 数据框中的纬度/经度值转换为 Uber 的 H3 十六进制系统。这是对函数 h3.geo_to_h3(lat=lat, lng=lon, resolution=7)
的非常直接的使用。但是我的 PySpark 集群一直有问题。
我正在按照数据块文章 here 中的描述设置我的 PySpark 集群,使用以下命令:
conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas h3 numpy python=3.7 conda-pack
conda init --all
然后关闭并重新打开终端 windowconda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz
我在我的 jupyter notebook 中包含我在创建 spark 集群时创建的 tar.gz 文件,就像这样 spark = SparkSession.builder.master("yarn").appName("test").config("spark.yarn.dist.archives","<path>/pyspark_conda_env.tar.gz#environment").getOrCreate()
我的 pandas udf 设置如下,我能够在单节点 spark 集群上工作,但现在在具有多个工作节点的集群上遇到问题:
#create udf to convert lat lon to h3 hex
def convert_to_h3(lat : pd.Series, lon : pd.Series) -> pd.Series:
import h3 as h3
import numpy as np
if ((None in [lat, lon]) | (np.isnan(lat))):
return None
else:
return (h3.geo_to_h3(lat=lat, lng=lon, resolution=7))
@f.pandas_udf('string', f.PandasUDFType.SCALAR)
def udf_convert_to_h3(lat : pd.Series, lon : pd.Series) -> pd.Series:
import pandas as pd
import numpy as np
df = pd.DataFrame({'lat' : lat, 'lon' : lon})
df['h3_res7'] = df.apply(lambda x : convert_to_h3(x['lat'], x['lon']), axis = 1)
return df['h3_res7']
在使用 pandas udf 创建新列并尝试查看它之后:
trip_starts = trip_starts.withColumn('h3_res7', udf_convert_to_h3(f.col('latitude'), f.col('longitude')))
我收到以下错误:
21/07/15 20:05:22 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 139 for reason Container marked as failed: container_1626376534301_0015_01_000158 on host: ip-xx-xxx-xx-xxx.aws.com. Exit status: -100. Diagnostics: Container released on a *lost* node.
我不确定在这里做什么,因为我已经尝试将记录数量缩减到更易于管理的数量,但仍在 运行 解决这个问题。理想情况下,我想弄清楚如何使用 PySpark 环境,如 databricks 博客 post 中所述,我链接而不是 运行ning bootstrap 由于公司政策而启动集群时的脚本使 bootstrap 脚本更难 运行。
我最终通过将我的数据重新分区为更小的分区来解决这个问题,每个分区中的记录更少。这解决了我的问题。