在 Pyspark 的数据框中将每组总数添加为新行

Add total per group as a new row in dataframe in Pyspark

参考我之前的问题 如果我尝试计算和添加总行,每个品牌,父级和 week_num(总使用量)

这是虚拟样本:

df0 = spark.createDataFrame(
    [
        (2, "A", "A2", "A2web", 2500),
        (2, "A", "A2", "A2TV", 3500),
        (4, "A", "A1", "A2app", 5500),
        (4, "A", "AD", "ADapp", 2000),
        (4, "B", "B25", "B25app", 7600),
        (4, "B", "B26", "B26app", 5600),
        (5, "C", "c25", "c25app", 2658),
        (5, "C", "c27", "c27app", 1100),
        (5, "C", "c28", "c26app", 1200),
    ],
    ["week_num", "parent", "brand", "channel", "usage"],
)

此代码段为每个频道添加了总行数

# Group by and sum to get the totals
totals = (
    df0.groupBy(["week_num", "parent", "brand"])
    .agg(f.sum("usage").alias("usage"))
    .withColumn("channel", f.lit("Total"))
)

# create a temp variable to sort
totals = totals.withColumn("sort_id", f.lit(2))
df0 = df0.withColumn("sort_id", f.lit(1))

# Union dataframes, drop temp variable and show
df1 = df0.unionByName(totals).sort(["week_num", "parent", "brand", "sort_id"])

df1.show()

结果:

+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
|       2|     A|   A2|  A2web| 2500|
|       2|     A|   A2|   A2TV| 3500|
|       2|     A|   A2|  Total| 6000|
|       4|     A|   A1|  A2app| 5500|
|       4|     A|   A1|  Total| 5500|
|       4|     A|   AD|  ADapp| 2000|
|       4|     A|   AD|  Total| 2000|
|       4|     B|  B25| B25app| 7600|
|       4|     B|  B25|  Total| 7600|
|       4|     B|  B26| B26app| 5600|
|       4|     B|  B26|  Total| 5600|
|       5|     C|  c25| c25app| 2658|
|       5|     C|  c25|  Total| 2658|
|       5|     C|  c27| c27app| 1100|
|       5|     C|  c27|  Total| 1100|
|       5|     C|  c28| c26app| 1200|
|       5|     C|  c28|  Total| 1200|
+--------+------+-----+-------+-----+

通道列没问题,为了得到类似下面的结果,我简单地重复第一个过程 groupby+sum 然后将结果合并回来

+--------+------+-----+-------+-----+ 
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
|       2|     A|   A2|  A2web| 2500|
|       2|     A|   A2|   A2TV| 3500|
|       2|     A|   A2|  Total| 6000|
|       2|     A|Total|       | 6000|
|       2| Total|     |       | 6000|

这里分两步

# add brand total row
df2 = (
    df0.groupBy(["week_num", "parent"])
    .agg(f.sum("usage").alias("usage"))
    .withColumn("brand", f.lit("Total"))
    .withColumn("channel", f.lit(""))
)
df2 = df1.unionByName(df2).sort(["week_num", "parent", "brand", "channel"])

# add weeknum total row
df3 = (
    df0.groupBy(["week_num"])
    .agg(f.sum("usage").alias("usage"))
    .withColumn("parent", f.lit("Total"))
    .withColumn("brand", f.lit(""))
    .withColumn("channel", f.lit(""))
)
df3 = df2.unionByName(df3).sort(["week_num", "parent", "brand", "channel"])

结果:

+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
|       2|     A|   A2|   A2TV| 3500|
|       2|     A|   A2|  A2web| 2500|
|       2|     A|   A2|  Total| 6000|
|       2|     A|Total|       | 6000|
|       2| Total|     |       | 6000|
|       4|     A|   A1|  A2app| 5500|
|       4|     A|   A1|  Total| 5500|
|       4|     A|   AD|  ADapp| 2000|
|       4|     A|   AD|  Total| 2000|
|       4|     A|Total|       | 7500|
|       4|     B|  B25| B25app| 7600|
|       4|     B|  B25|  Total| 7600|
|       4|     B|  B26| B26app| 5600|
|       4|     B|  B26|  Total| 5600|
|       4|     B|Total|       |13200|
|       4| Total|     |       |20700|
|       5|     C|Total|       | 4958|
|       5|     C|  c25|  Total| 2658|
|       5|     C|  c25| c25app| 2658|
|       5|     C|  c27|  Total| 1100|
+--------+------+-----+-------+-----+

第一个问题,有没有替代方法或者更有效的方法而不重复? 其次,如果我想在每个组的顶部始终显示总计,而不考虑 parent/brand/channel 字母名称,我该如何排序。像这样:(这是虚拟数据,但我希望它足够清楚)

+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
|       2| Total|     |       | 6000|
|       2|     A|Total|       | 6000|
|       2|     A|   A2|  Total| 6000|
|       2|     A|   A2|   A2TV| 3500|
|       2|     A|   A2|  A2web| 2500|
|       4| Total|     |       |20700|
|       4|     A|Total|       | 7500|
|       4|     B|Total|       |13200|
|       4|     A|   A1|  Total| 5500| 
|       4|     A|   A1|  A2app| 5500|
|       4|     A|   AD|  Total| 2000|
|       4|     A|   AD|  ADapp| 2000|
|       4|     B|  B25|  Total| 7600|
|       4|     B|  B25| B25app| 7600|
|       4|     B|  B26|  Total| 5600|
|       4|     B|  B26| B26app| 5600|

我认为你只需要 rollup 方法。

agg_df = (
    df.rollup(["week_num", "parent", "brand", "channel"])
    .agg(F.sum("usage").alias("usage"), F.grouping_id().alias("lvl"))
    .orderBy(agg_cols)
)

agg_df.show()
+--------+------+-----+-------+-----+---+
|week_num|parent|brand|channel|usage|lvl|
+--------+------+-----+-------+-----+---+
|    null|  null| null|   null|31658| 15|
|       2|  null| null|   null| 6000|  7|
|       2|     A| null|   null| 6000|  3|
|       2|     A|   A2|   null| 6000|  1|
|       2|     A|   A2|   A2TV| 3500|  0|
|       2|     A|   A2|  A2web| 2500|  0|
|       4|  null| null|   null|20700|  7|
|       4|     A| null|   null| 7500|  3|
|       4|     A|   A1|   null| 5500|  1|
|       4|     A|   A1|  A2app| 5500|  0|
|       4|     A|   AD|   null| 2000|  1|
|       4|     A|   AD|  ADapp| 2000|  0|
|       4|     B| null|   null|13200|  3|
|       4|     B|  B25|   null| 7600|  1|
|       4|     B|  B25| B25app| 7600|  0|
|       4|     B|  B26|   null| 5600|  1|
|       4|     B|  B26| B26app| 5600|  0|
|       5|  null| null|   null| 4958|  7|
|       5|     C| null|   null| 4958|  3|
|       5|     C|  c25|   null| 2658|  1|
+--------+------+-----+-------+-----+---+
only showing top 20 rows

剩下的就是纯粹的装饰品。使用 spark 可能不是一个好主意。最好在您之后将使用的恢复工具中执行此操作。

agg_df = agg_df.withColumn("lvl", F.dense_rank().over(Window.orderBy("lvl")))

TOTAL = "Total"
agg_df = (
    agg_df.withColumn(
        "parent", F.when(F.col("lvl") == 4, TOTAL).otherwise(F.col("parent"))
    )
    .withColumn(
        "brand",
        F.when(F.col("lvl") == 3, TOTAL).otherwise(
            F.coalesce(F.col("brand"), F.lit(""))
        ),
    )
    .withColumn(
        "channel",
        F.when(F.col("lvl") == 2, TOTAL).otherwise(
            F.coalesce(F.col("channel"), F.lit(""))
        ),
    )
)

agg_df.where(F.col("lvl") != 5).orderBy(
    "week_num", F.col("lvl").desc(), "parent", "brand", "channel"
).drop("lvl").show(500)

+--------+------+-----+-------+-----+
|week_num|parent|brand|channel|usage|
+--------+------+-----+-------+-----+
|       2| Total|     |       | 6000|
|       2|     A|Total|       | 6000|
|       2|     A|   A2|  Total| 6000|
|       2|     A|   A2|   A2TV| 3500|
|       2|     A|   A2|  A2web| 2500|
|       4| Total|     |       |20700|
|       4|     A|Total|       | 7500|
|       4|     B|Total|       |13200|
|       4|     A|   A1|  Total| 5500|
|       4|     A|   AD|  Total| 2000|
|       4|     B|  B25|  Total| 7600|
|       4|     B|  B26|  Total| 5600|
|       4|     A|   A1|  A2app| 5500|
|       4|     A|   AD|  ADapp| 2000|
|       4|     B|  B25| B25app| 7600|
|       4|     B|  B26| B26app| 5600|
|       5| Total|     |       | 4958|
|       5|     C|Total|       | 4958|
|       5|     C|  c25|  Total| 2658|
|       5|     C|  c27|  Total| 1100|
|       5|     C|  c28|  Total| 1200|
|       5|     C|  c25| c25app| 2658|
|       5|     C|  c27| c27app| 1100|
|       5|     C|  c28| c26app| 1200|
+--------+------+-----+-------+-----+