更改聚合子句是否会更改 pandas_udf - pyspark 中的任何内容?
Does changing aggregate clause change anything in pandas_udf - pyspark?
我是 spark 的新手,我想知道这是否会改变内存消耗以及任务分配给其工作人员的方式。请参阅下面的最小示例,以便您能够理解我的要求。
# import thing for the pandas udf
import pyspark.sql.functions as F
import pyspark.sql.types as T
# for creating minimal example
import pandas as pd
import numpy as np
#create minimal example
df_minimal_example = pd.DataFrame({"x":np.arange(0,50,1), "y":np.arange(50,100,1) })
# crate a random integer
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
sdf_minimal_example = spark.createDataFrame(df_minimal_example)
让我们打印输出
x y PARTITION_ID
0 0 50 1
1 1 51 0
2 2 52 1
3 3 53 1
4 4 54 0
现在我将执行 pandas udf,以便能够在 spark
中使用我的 python 函数
schema = T.StructType([T.StructField('xy', T.FloatType() ),
T.StructField('x2', T.FloatType() ),
T.StructField('y2', T.FloatType() ),
T.StructField('PARTITION_ID', T.LongType() )
]
)
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def newfunction(pdf):
pdf["xy"] = pdf["x"]*pdf["y"]
pdf["x2"] = pdf["x"]*pdf["x"]
pdf["y2"] = pdf["y"]*pdf["y"]
cols2retrieve = ["PARTITION_ID","xy","x2","y2"]
newpdf = pdf[cols2retrieve].copy()
return newpdf
newpdf = sdf_minimal_example.groupby("PARTITION_ID").apply(newfunction)
# to see results
display(newpdf )
如您所见,我在应用 pandas udf 函数时使用 .groupby("PARTITION_ID");并且“PARTITION_ID”列有 1 或 0。问题是:如果 PARTITION_ID 有 0 到 100 之间的整数怎么办?例如:
#instead of this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
# use this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,100,size=len(df_minimal_example) )
这是否会改变内存问题以及如何将任务分配给每个工作人员?如果有人可以提供更多关于此的信息,那就太好了。
groupby
是 Spark 中的 Wide 转换,这意味着需要对数据进行混洗,并且此操作通常会消耗内存。
将聚合键从 2 更改为 100 将如何影响性能很难提前判断,因为它取决于数据的“物理”重新分区。
您可以使用此 PARTITION_ID
重新分区您的数据,如果您将此列用于 joins
或 groupby
.
,它可以加快运行速度
我说“可以”是因为需要权衡取舍,拥有大量小文件可能会影响其他活动的性能,因此它不像在右列上重新分区以查看性能提升那样直接。
查看此post了解更多详情。
我是 spark 的新手,我想知道这是否会改变内存消耗以及任务分配给其工作人员的方式。请参阅下面的最小示例,以便您能够理解我的要求。
# import thing for the pandas udf
import pyspark.sql.functions as F
import pyspark.sql.types as T
# for creating minimal example
import pandas as pd
import numpy as np
#create minimal example
df_minimal_example = pd.DataFrame({"x":np.arange(0,50,1), "y":np.arange(50,100,1) })
# crate a random integer
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
sdf_minimal_example = spark.createDataFrame(df_minimal_example)
让我们打印输出
x y PARTITION_ID
0 0 50 1
1 1 51 0
2 2 52 1
3 3 53 1
4 4 54 0
现在我将执行 pandas udf,以便能够在 spark
中使用我的 python 函数schema = T.StructType([T.StructField('xy', T.FloatType() ),
T.StructField('x2', T.FloatType() ),
T.StructField('y2', T.FloatType() ),
T.StructField('PARTITION_ID', T.LongType() )
]
)
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def newfunction(pdf):
pdf["xy"] = pdf["x"]*pdf["y"]
pdf["x2"] = pdf["x"]*pdf["x"]
pdf["y2"] = pdf["y"]*pdf["y"]
cols2retrieve = ["PARTITION_ID","xy","x2","y2"]
newpdf = pdf[cols2retrieve].copy()
return newpdf
newpdf = sdf_minimal_example.groupby("PARTITION_ID").apply(newfunction)
# to see results
display(newpdf )
如您所见,我在应用 pandas udf 函数时使用 .groupby("PARTITION_ID");并且“PARTITION_ID”列有 1 或 0。问题是:如果 PARTITION_ID 有 0 到 100 之间的整数怎么办?例如:
#instead of this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
# use this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,100,size=len(df_minimal_example) )
这是否会改变内存问题以及如何将任务分配给每个工作人员?如果有人可以提供更多关于此的信息,那就太好了。
groupby
是 Spark 中的 Wide 转换,这意味着需要对数据进行混洗,并且此操作通常会消耗内存。
将聚合键从 2 更改为 100 将如何影响性能很难提前判断,因为它取决于数据的“物理”重新分区。
您可以使用此 PARTITION_ID
重新分区您的数据,如果您将此列用于 joins
或 groupby
.
我说“可以”是因为需要权衡取舍,拥有大量小文件可能会影响其他活动的性能,因此它不像在右列上重新分区以查看性能提升那样直接。
查看此post了解更多详情。