Pyspark SQL Pandas 没有 GroupBy 的分组地图?
Pyspark SQL Pandas Grouped Map without GroupBy?
我有一个数据集,我想使用多个 Pyspark SQL Grouped Map UDF 在 AWS EMR 中的临时集群上运行的较大 ETL 过程的不同阶段进行映射。分组映射 API 要求在应用之前对 Pyspark 数据帧进行分组,但我实际上不需要对键进行分组。
目前,我使用的是任意分组,它有效,但结果是:
不必要的随机播放。
每个作业中任意 groupby 的 Hacky 代码。
我的理想解决方案允许矢量化 Pandas UDF 在没有任意分组的情况下应用,但如果我可以保存至少会消除随机分组的任意分组。
编辑:
我的代码如下所示。我最初使用的是任意分组,但目前正在尝试 spark_partition_id()
根据@pault 下面的评论。
@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
b = a_partition.drop("pid", axis=1)
# Some other transform stuff
return b
(sql
.read.parquet(a_path)
.withColumn("pid", spark_partition_id())
.groupBy("pid")
.apply(transform)
.write.parquet(b_path))
使用 spark_partition_id()
似乎仍会导致随机播放。我得到以下 DAG:
第 1 阶段
- 扫描镶木地板
- 项目
- 项目
- 交换
第 2 阶段
- 交换
- 排序
- FlatMapGroupsInPandas
要支持大致等效的逻辑(函数 (pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame
),您必须切换到 Spark 3.0.0 并使用 MAP_ITER
转换。
在最新的预览版 (3.0.0-preview2) 中,您需要一个 UDF:
@pandas_udf(b_schema, PandasUDFType.MAP_ITER)
def transform(dfs):
for df in dfs:
b = df.drop("pid", axis=1)
...
yield b
df.mapInPandas(transform)
并且在即将发布的 3.0.0 版本 (SPARK-28264) 中只是一个简单的函数:
def transform(dfs):
for df in dfs:
b = df.drop("pid", axis=1)
# Some other transform stuff
...
yield b
df.mapInPandas(transform, b_schema)
2.x 上的一个可能的解决方法是使用纯 SCALAR
UDF,将结果的每一行序列化为 JSON,并在另一侧反序列化它,即
import json
from pyspark.sql.functions import from_json
@pandas_udf("string", PandasUDFType.SCALAR)
def transform(col1, col2):
b = pd.DataFrame({"x": col1, "y": col2})
...
return b.apply(lambda x: json.dumps(dict(zip(df.columns, x))), axis=1)
(df
.withColumn("json_result", transform("col1", "col2"))
.withColumn("a_struct", from_json("json_result", b_schema)))
我有一个数据集,我想使用多个 Pyspark SQL Grouped Map UDF 在 AWS EMR 中的临时集群上运行的较大 ETL 过程的不同阶段进行映射。分组映射 API 要求在应用之前对 Pyspark 数据帧进行分组,但我实际上不需要对键进行分组。
目前,我使用的是任意分组,它有效,但结果是:
不必要的随机播放。
每个作业中任意 groupby 的 Hacky 代码。
我的理想解决方案允许矢量化 Pandas UDF 在没有任意分组的情况下应用,但如果我可以保存至少会消除随机分组的任意分组。
编辑:
我的代码如下所示。我最初使用的是任意分组,但目前正在尝试 spark_partition_id()
根据@pault 下面的评论。
@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
b = a_partition.drop("pid", axis=1)
# Some other transform stuff
return b
(sql
.read.parquet(a_path)
.withColumn("pid", spark_partition_id())
.groupBy("pid")
.apply(transform)
.write.parquet(b_path))
使用 spark_partition_id()
似乎仍会导致随机播放。我得到以下 DAG:
第 1 阶段
- 扫描镶木地板
- 项目
- 项目
- 交换
第 2 阶段
- 交换
- 排序
- FlatMapGroupsInPandas
要支持大致等效的逻辑(函数 (pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame
),您必须切换到 Spark 3.0.0 并使用 MAP_ITER
转换。
在最新的预览版 (3.0.0-preview2) 中,您需要一个 UDF:
@pandas_udf(b_schema, PandasUDFType.MAP_ITER)
def transform(dfs):
for df in dfs:
b = df.drop("pid", axis=1)
...
yield b
df.mapInPandas(transform)
并且在即将发布的 3.0.0 版本 (SPARK-28264) 中只是一个简单的函数:
def transform(dfs):
for df in dfs:
b = df.drop("pid", axis=1)
# Some other transform stuff
...
yield b
df.mapInPandas(transform, b_schema)
2.x 上的一个可能的解决方法是使用纯 SCALAR
UDF,将结果的每一行序列化为 JSON,并在另一侧反序列化它,即
import json
from pyspark.sql.functions import from_json
@pandas_udf("string", PandasUDFType.SCALAR)
def transform(col1, col2):
b = pd.DataFrame({"x": col1, "y": col2})
...
return b.apply(lambda x: json.dumps(dict(zip(df.columns, x))), axis=1)
(df
.withColumn("json_result", transform("col1", "col2"))
.withColumn("a_struct", from_json("json_result", b_schema)))