在 PySpark 中的 groupby 之后计算 sum 和 countDistinct
calculate the sum and countDistinct after groupby in PySpark
我有一个 PySpark 数据框,我想按几列分组,然后计算一些列的总和并计算另一列的不同值。由于 countDistinct
不是内置的聚合函数,我不能使用像我在这里尝试的那样的简单表达式:
sum_cols = ['a', 'b']
count_cols = ['id']
exprs1 = {x: "sum" for x in sum_cols}
exprs2 = {x: "countDistinct" for x in count_cols}
exprs = {**exprs1, **exprs2}
df_aggregated = df.groupby('month','product').agg(exprs)
我也尝试了 中的方法作为 exprs2 = [countDistinct(x) for x in count_cols]
但是当我 AssertionError: all exprs should be Column
时我收到一条错误消息,当我只对聚合列尝试它时。
我如何在一个聚合中合并求和和非重复计数?我知道,我可以用 sum
列做一次,用 countDistinct
列做一次,然后加入两个数据框,但应该有一个解决方案可以一步完成......
不确定为什么必须使用 expr
,但正常聚合应该有效。 countDistinct
是聚合函数。
(df
.groupBy('month','product')
.agg(
F.sum('a', 'b'),
F.countDistinct('id')
)
).show()
# +----+-----------+-------------+
# |name|sum(field1)|count(field1)|
# +----+-----------+-------------+
# | d| 0| 1|
# | c| 10| 1|
# | b| 5| 1|
# | a| 4| 1|
# +----+-----------+-------------+
不要使用 agg 的字典版本,而是使用带有列列表的版本:
from pyspark.sql import functions as F
df = ...
exprs1 = [F.sum(c) for c in sum_cols]
exprs2 = [F.countDistinct(c) for c in count_cols]
df_aggregated = df.groupby('month_product').agg(*(exprs1+exprs2))
如果你想保持当前的逻辑,你可以切换到 approx_count_distinct。与 countDistinct
不同,此函数可用作 SQL 函数。
我有一个 PySpark 数据框,我想按几列分组,然后计算一些列的总和并计算另一列的不同值。由于 countDistinct
不是内置的聚合函数,我不能使用像我在这里尝试的那样的简单表达式:
sum_cols = ['a', 'b']
count_cols = ['id']
exprs1 = {x: "sum" for x in sum_cols}
exprs2 = {x: "countDistinct" for x in count_cols}
exprs = {**exprs1, **exprs2}
df_aggregated = df.groupby('month','product').agg(exprs)
我也尝试了 exprs2 = [countDistinct(x) for x in count_cols]
但是当我 AssertionError: all exprs should be Column
时我收到一条错误消息,当我只对聚合列尝试它时。
我如何在一个聚合中合并求和和非重复计数?我知道,我可以用 sum
列做一次,用 countDistinct
列做一次,然后加入两个数据框,但应该有一个解决方案可以一步完成......
不确定为什么必须使用 expr
,但正常聚合应该有效。 countDistinct
是聚合函数。
(df
.groupBy('month','product')
.agg(
F.sum('a', 'b'),
F.countDistinct('id')
)
).show()
# +----+-----------+-------------+
# |name|sum(field1)|count(field1)|
# +----+-----------+-------------+
# | d| 0| 1|
# | c| 10| 1|
# | b| 5| 1|
# | a| 4| 1|
# +----+-----------+-------------+
不要使用 agg 的字典版本,而是使用带有列列表的版本:
from pyspark.sql import functions as F
df = ...
exprs1 = [F.sum(c) for c in sum_cols]
exprs2 = [F.countDistinct(c) for c in count_cols]
df_aggregated = df.groupby('month_product').agg(*(exprs1+exprs2))
如果你想保持当前的逻辑,你可以切换到 approx_count_distinct。与 countDistinct
不同,此函数可用作 SQL 函数。