更改聚合子句是否会更改 pandas_udf - pyspark 中的任何内容?

Does changing aggregate clause change anything in pandas_udf - pyspark?

我是 spark 的新手,我想知道这是否会改变内存消耗以及任务分配给其工作人员的方式。请参阅下面的最小示例,以便您能够理解我的要求。

# import thing for the pandas udf
import pyspark.sql.functions as F
import pyspark.sql.types as T
# for creating minimal example
import pandas as pd
import numpy as np

#create minimal example 
df_minimal_example = pd.DataFrame({"x":np.arange(0,50,1), "y":np.arange(50,100,1) })
# crate a random integer 
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
sdf_minimal_example = spark.createDataFrame(df_minimal_example)

让我们打印输出

   x   y  PARTITION_ID
0  0  50             1
1  1  51             0
2  2  52             1
3  3  53             1
4  4  54             0

现在我将执行 pandas udf,以便能够在 spark

中使用我的 python 函数
schema =  T.StructType([T.StructField('xy', T.FloatType() ),
                        T.StructField('x2', T.FloatType() ),
                        T.StructField('y2', T.FloatType() ), 
                        T.StructField('PARTITION_ID', T.LongType() )
                       ]
                      )

@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def newfunction(pdf):
  pdf["xy"] = pdf["x"]*pdf["y"]
  pdf["x2"] = pdf["x"]*pdf["x"]
  pdf["y2"] = pdf["y"]*pdf["y"]
  cols2retrieve = ["PARTITION_ID","xy","x2","y2"]
  newpdf = pdf[cols2retrieve].copy()
  return newpdf
  
newpdf = sdf_minimal_example.groupby("PARTITION_ID").apply(newfunction)
# to see results
display(newpdf ) 

如您所见,我在应用 pandas udf 函数时使用 .groupby("PARTITION_ID");并且“PARTITION_ID”列有 1 或 0。问题是:如果 PARTITION_ID 有 0 到 100 之间的整数怎么办?例如:

#instead of this
 df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) ) 
# use this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,100,size=len(df_minimal_example) ) 

这是否会改变内存问题以及如何将任务分配给每个工作人员?如果有人可以提供更多关于此的信息,那就太好了。

groupby 是 Spark 中的 Wide 转换,这意味着需要对数据进行混洗,并且此操作通常会消耗内存。

将聚合键从 2 更改为 100 将如何影响性能很难提前判断,因为它取决于数据的“物理”重新分区。

您可以使用此 PARTITION_ID 重新分区您的数据,如果您将此列用于 joinsgroupby.

,它可以加快运行速度

我说“可以”是因为需要权衡取舍,拥有大量小文件可能会影响其他活动的性能,因此它不像在右列上重新分区以查看性能提升那样直接。

查看此post了解更多详情。