在 Pyspark 的数据框中将每组总数添加为新行
Add total per group as a new row in dataframe in Pyspark
参考我之前的问题 如果我尝试计算和添加总行,每个品牌,父级和 week_num(总使用量)
这是虚拟样本:
df0 = spark.createDataFrame(
[
(2, "A", "A2", "A2web", 2500),
(2, "A", "A2", "A2TV", 3500),
(4, "A", "A1", "A2app", 5500),
(4, "A", "AD", "ADapp", 2000),
(4, "B", "B25", "B25app", 7600),
(4, "B", "B26", "B26app", 5600),
(5, "C", "c25", "c25app", 2658),
(5, "C", "c27", "c27app", 1100),
(5, "C", "c28", "c26app", 1200),
],
["week_num", "parent", "brand", "channel", "usage"],
)
此代码段为每个频道添加了总行数
# Group by and sum to get the totals
totals = (
df0.groupBy(["week_num", "parent", "brand"])
.agg(f.sum("usage").alias("usage"))
.withColumn("channel", f.lit("Total"))
)
# create a temp variable to sort
totals = totals.withColumn("sort_id", f.lit(2))
df0 = df0.withColumn("sort_id", f.lit(1))
# Union dataframes, drop temp variable and show
df1 = df0.unionByName(totals).sort(["week_num", "parent", "brand", "sort_id"])
df1.show()
结果:
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| A| A2| A2web| 2500|
| 2| A| A2| A2TV| 3500|
| 2| A| A2| Total| 6000|
| 4| A| A1| A2app| 5500|
| 4| A| A1| Total| 5500|
| 4| A| AD| ADapp| 2000|
| 4| A| AD| Total| 2000|
| 4| B| B25| B25app| 7600|
| 4| B| B25| Total| 7600|
| 4| B| B26| B26app| 5600|
| 4| B| B26| Total| 5600|
| 5| C| c25| c25app| 2658|
| 5| C| c25| Total| 2658|
| 5| C| c27| c27app| 1100|
| 5| C| c27| Total| 1100|
| 5| C| c28| c26app| 1200|
| 5| C| c28| Total| 1200|
+--------+------+-----+-------+-----+
通道列没问题,为了得到类似下面的结果,我简单地重复第一个过程 groupby+sum 然后将结果合并回来
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| A| A2| A2web| 2500|
| 2| A| A2| A2TV| 3500|
| 2| A| A2| Total| 6000|
| 2| A|Total| | 6000|
| 2| Total| | | 6000|
这里分两步
# add brand total row
df2 = (
df0.groupBy(["week_num", "parent"])
.agg(f.sum("usage").alias("usage"))
.withColumn("brand", f.lit("Total"))
.withColumn("channel", f.lit(""))
)
df2 = df1.unionByName(df2).sort(["week_num", "parent", "brand", "channel"])
# add weeknum total row
df3 = (
df0.groupBy(["week_num"])
.agg(f.sum("usage").alias("usage"))
.withColumn("parent", f.lit("Total"))
.withColumn("brand", f.lit(""))
.withColumn("channel", f.lit(""))
)
df3 = df2.unionByName(df3).sort(["week_num", "parent", "brand", "channel"])
结果:
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| A| A2| A2TV| 3500|
| 2| A| A2| A2web| 2500|
| 2| A| A2| Total| 6000|
| 2| A|Total| | 6000|
| 2| Total| | | 6000|
| 4| A| A1| A2app| 5500|
| 4| A| A1| Total| 5500|
| 4| A| AD| ADapp| 2000|
| 4| A| AD| Total| 2000|
| 4| A|Total| | 7500|
| 4| B| B25| B25app| 7600|
| 4| B| B25| Total| 7600|
| 4| B| B26| B26app| 5600|
| 4| B| B26| Total| 5600|
| 4| B|Total| |13200|
| 4| Total| | |20700|
| 5| C|Total| | 4958|
| 5| C| c25| Total| 2658|
| 5| C| c25| c25app| 2658|
| 5| C| c27| Total| 1100|
+--------+------+-----+-------+-----+
第一个问题,有没有替代方法或者更有效的方法而不重复?
其次,如果我想在每个组的顶部始终显示总计,而不考虑 parent/brand/channel 字母名称,我该如何排序。像这样:(这是虚拟数据,但我希望它足够清楚)
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| Total| | | 6000|
| 2| A|Total| | 6000|
| 2| A| A2| Total| 6000|
| 2| A| A2| A2TV| 3500|
| 2| A| A2| A2web| 2500|
| 4| Total| | |20700|
| 4| A|Total| | 7500|
| 4| B|Total| |13200|
| 4| A| A1| Total| 5500|
| 4| A| A1| A2app| 5500|
| 4| A| AD| Total| 2000|
| 4| A| AD| ADapp| 2000|
| 4| B| B25| Total| 7600|
| 4| B| B25| B25app| 7600|
| 4| B| B26| Total| 5600|
| 4| B| B26| B26app| 5600|
我认为你只需要 rollup
方法。
agg_df = (
df.rollup(["week_num", "parent", "brand", "channel"])
.agg(F.sum("usage").alias("usage"), F.grouping_id().alias("lvl"))
.orderBy(agg_cols)
)
agg_df.show()
+--------+------+-----+-------+-----+---+
|week_num|parent|brand|channel|usage|lvl|
+--------+------+-----+-------+-----+---+
| null| null| null| null|31658| 15|
| 2| null| null| null| 6000| 7|
| 2| A| null| null| 6000| 3|
| 2| A| A2| null| 6000| 1|
| 2| A| A2| A2TV| 3500| 0|
| 2| A| A2| A2web| 2500| 0|
| 4| null| null| null|20700| 7|
| 4| A| null| null| 7500| 3|
| 4| A| A1| null| 5500| 1|
| 4| A| A1| A2app| 5500| 0|
| 4| A| AD| null| 2000| 1|
| 4| A| AD| ADapp| 2000| 0|
| 4| B| null| null|13200| 3|
| 4| B| B25| null| 7600| 1|
| 4| B| B25| B25app| 7600| 0|
| 4| B| B26| null| 5600| 1|
| 4| B| B26| B26app| 5600| 0|
| 5| null| null| null| 4958| 7|
| 5| C| null| null| 4958| 3|
| 5| C| c25| null| 2658| 1|
+--------+------+-----+-------+-----+---+
only showing top 20 rows
剩下的就是纯粹的装饰品。使用 spark
可能不是一个好主意。最好在您之后将使用的恢复工具中执行此操作。
agg_df = agg_df.withColumn("lvl", F.dense_rank().over(Window.orderBy("lvl")))
TOTAL = "Total"
agg_df = (
agg_df.withColumn(
"parent", F.when(F.col("lvl") == 4, TOTAL).otherwise(F.col("parent"))
)
.withColumn(
"brand",
F.when(F.col("lvl") == 3, TOTAL).otherwise(
F.coalesce(F.col("brand"), F.lit(""))
),
)
.withColumn(
"channel",
F.when(F.col("lvl") == 2, TOTAL).otherwise(
F.coalesce(F.col("channel"), F.lit(""))
),
)
)
agg_df.where(F.col("lvl") != 5).orderBy(
"week_num", F.col("lvl").desc(), "parent", "brand", "channel"
).drop("lvl").show(500)
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| Total| | | 6000|
| 2| A|Total| | 6000|
| 2| A| A2| Total| 6000|
| 2| A| A2| A2TV| 3500|
| 2| A| A2| A2web| 2500|
| 4| Total| | |20700|
| 4| A|Total| | 7500|
| 4| B|Total| |13200|
| 4| A| A1| Total| 5500|
| 4| A| AD| Total| 2000|
| 4| B| B25| Total| 7600|
| 4| B| B26| Total| 5600|
| 4| A| A1| A2app| 5500|
| 4| A| AD| ADapp| 2000|
| 4| B| B25| B25app| 7600|
| 4| B| B26| B26app| 5600|
| 5| Total| | | 4958|
| 5| C|Total| | 4958|
| 5| C| c25| Total| 2658|
| 5| C| c27| Total| 1100|
| 5| C| c28| Total| 1200|
| 5| C| c25| c25app| 2658|
| 5| C| c27| c27app| 1100|
| 5| C| c28| c26app| 1200|
+--------+------+-----+-------+-----+
参考我之前的问题
这是虚拟样本:
df0 = spark.createDataFrame(
[
(2, "A", "A2", "A2web", 2500),
(2, "A", "A2", "A2TV", 3500),
(4, "A", "A1", "A2app", 5500),
(4, "A", "AD", "ADapp", 2000),
(4, "B", "B25", "B25app", 7600),
(4, "B", "B26", "B26app", 5600),
(5, "C", "c25", "c25app", 2658),
(5, "C", "c27", "c27app", 1100),
(5, "C", "c28", "c26app", 1200),
],
["week_num", "parent", "brand", "channel", "usage"],
)
此代码段为每个频道添加了总行数
# Group by and sum to get the totals
totals = (
df0.groupBy(["week_num", "parent", "brand"])
.agg(f.sum("usage").alias("usage"))
.withColumn("channel", f.lit("Total"))
)
# create a temp variable to sort
totals = totals.withColumn("sort_id", f.lit(2))
df0 = df0.withColumn("sort_id", f.lit(1))
# Union dataframes, drop temp variable and show
df1 = df0.unionByName(totals).sort(["week_num", "parent", "brand", "sort_id"])
df1.show()
结果:
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| A| A2| A2web| 2500|
| 2| A| A2| A2TV| 3500|
| 2| A| A2| Total| 6000|
| 4| A| A1| A2app| 5500|
| 4| A| A1| Total| 5500|
| 4| A| AD| ADapp| 2000|
| 4| A| AD| Total| 2000|
| 4| B| B25| B25app| 7600|
| 4| B| B25| Total| 7600|
| 4| B| B26| B26app| 5600|
| 4| B| B26| Total| 5600|
| 5| C| c25| c25app| 2658|
| 5| C| c25| Total| 2658|
| 5| C| c27| c27app| 1100|
| 5| C| c27| Total| 1100|
| 5| C| c28| c26app| 1200|
| 5| C| c28| Total| 1200|
+--------+------+-----+-------+-----+
通道列没问题,为了得到类似下面的结果,我简单地重复第一个过程 groupby+sum 然后将结果合并回来
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| A| A2| A2web| 2500|
| 2| A| A2| A2TV| 3500|
| 2| A| A2| Total| 6000|
| 2| A|Total| | 6000|
| 2| Total| | | 6000|
这里分两步
# add brand total row
df2 = (
df0.groupBy(["week_num", "parent"])
.agg(f.sum("usage").alias("usage"))
.withColumn("brand", f.lit("Total"))
.withColumn("channel", f.lit(""))
)
df2 = df1.unionByName(df2).sort(["week_num", "parent", "brand", "channel"])
# add weeknum total row
df3 = (
df0.groupBy(["week_num"])
.agg(f.sum("usage").alias("usage"))
.withColumn("parent", f.lit("Total"))
.withColumn("brand", f.lit(""))
.withColumn("channel", f.lit(""))
)
df3 = df2.unionByName(df3).sort(["week_num", "parent", "brand", "channel"])
结果:
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| A| A2| A2TV| 3500|
| 2| A| A2| A2web| 2500|
| 2| A| A2| Total| 6000|
| 2| A|Total| | 6000|
| 2| Total| | | 6000|
| 4| A| A1| A2app| 5500|
| 4| A| A1| Total| 5500|
| 4| A| AD| ADapp| 2000|
| 4| A| AD| Total| 2000|
| 4| A|Total| | 7500|
| 4| B| B25| B25app| 7600|
| 4| B| B25| Total| 7600|
| 4| B| B26| B26app| 5600|
| 4| B| B26| Total| 5600|
| 4| B|Total| |13200|
| 4| Total| | |20700|
| 5| C|Total| | 4958|
| 5| C| c25| Total| 2658|
| 5| C| c25| c25app| 2658|
| 5| C| c27| Total| 1100|
+--------+------+-----+-------+-----+
第一个问题,有没有替代方法或者更有效的方法而不重复? 其次,如果我想在每个组的顶部始终显示总计,而不考虑 parent/brand/channel 字母名称,我该如何排序。像这样:(这是虚拟数据,但我希望它足够清楚)
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| Total| | | 6000|
| 2| A|Total| | 6000|
| 2| A| A2| Total| 6000|
| 2| A| A2| A2TV| 3500|
| 2| A| A2| A2web| 2500|
| 4| Total| | |20700|
| 4| A|Total| | 7500|
| 4| B|Total| |13200|
| 4| A| A1| Total| 5500|
| 4| A| A1| A2app| 5500|
| 4| A| AD| Total| 2000|
| 4| A| AD| ADapp| 2000|
| 4| B| B25| Total| 7600|
| 4| B| B25| B25app| 7600|
| 4| B| B26| Total| 5600|
| 4| B| B26| B26app| 5600|
我认为你只需要 rollup
方法。
agg_df = (
df.rollup(["week_num", "parent", "brand", "channel"])
.agg(F.sum("usage").alias("usage"), F.grouping_id().alias("lvl"))
.orderBy(agg_cols)
)
agg_df.show()
+--------+------+-----+-------+-----+---+
|week_num|parent|brand|channel|usage|lvl|
+--------+------+-----+-------+-----+---+
| null| null| null| null|31658| 15|
| 2| null| null| null| 6000| 7|
| 2| A| null| null| 6000| 3|
| 2| A| A2| null| 6000| 1|
| 2| A| A2| A2TV| 3500| 0|
| 2| A| A2| A2web| 2500| 0|
| 4| null| null| null|20700| 7|
| 4| A| null| null| 7500| 3|
| 4| A| A1| null| 5500| 1|
| 4| A| A1| A2app| 5500| 0|
| 4| A| AD| null| 2000| 1|
| 4| A| AD| ADapp| 2000| 0|
| 4| B| null| null|13200| 3|
| 4| B| B25| null| 7600| 1|
| 4| B| B25| B25app| 7600| 0|
| 4| B| B26| null| 5600| 1|
| 4| B| B26| B26app| 5600| 0|
| 5| null| null| null| 4958| 7|
| 5| C| null| null| 4958| 3|
| 5| C| c25| null| 2658| 1|
+--------+------+-----+-------+-----+---+
only showing top 20 rows
剩下的就是纯粹的装饰品。使用 spark
可能不是一个好主意。最好在您之后将使用的恢复工具中执行此操作。
agg_df = agg_df.withColumn("lvl", F.dense_rank().over(Window.orderBy("lvl")))
TOTAL = "Total"
agg_df = (
agg_df.withColumn(
"parent", F.when(F.col("lvl") == 4, TOTAL).otherwise(F.col("parent"))
)
.withColumn(
"brand",
F.when(F.col("lvl") == 3, TOTAL).otherwise(
F.coalesce(F.col("brand"), F.lit(""))
),
)
.withColumn(
"channel",
F.when(F.col("lvl") == 2, TOTAL).otherwise(
F.coalesce(F.col("channel"), F.lit(""))
),
)
)
agg_df.where(F.col("lvl") != 5).orderBy(
"week_num", F.col("lvl").desc(), "parent", "brand", "channel"
).drop("lvl").show(500)
+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
| 2| Total| | | 6000|
| 2| A|Total| | 6000|
| 2| A| A2| Total| 6000|
| 2| A| A2| A2TV| 3500|
| 2| A| A2| A2web| 2500|
| 4| Total| | |20700|
| 4| A|Total| | 7500|
| 4| B|Total| |13200|
| 4| A| A1| Total| 5500|
| 4| A| AD| Total| 2000|
| 4| B| B25| Total| 7600|
| 4| B| B26| Total| 5600|
| 4| A| A1| A2app| 5500|
| 4| A| AD| ADapp| 2000|
| 4| B| B25| B25app| 7600|
| 4| B| B26| B26app| 5600|
| 5| Total| | | 4958|
| 5| C|Total| | 4958|
| 5| C| c25| Total| 2658|
| 5| C| c27| Total| 1100|
| 5| C| c28| Total| 1200|
| 5| C| c25| c25app| 2658|
| 5| C| c27| c27app| 1100|
| 5| C| c28| c26app| 1200|
+--------+------+-----+-------+-----+