在汇总其他人时计数不同?

Count distinct while aggregating others?

这是我的数据集的样子:

+---------+------------+-----------------+
|  name   |request_type| request_group_id|
+---------+------------+-----------------+
|Michael  |     X      |  1020           |
|Michael  |     X      |  1018           |
|Joe      |     Y      |  1018           |
|Sam      |     X      |  1018           |
|Michael  |     Y      |  1021           |
|Sam      |     X      |  1030           |
|Elizabeth|     Y      |  1035           |
+---------+------------+-----------------+

我想计算每个人 request_type 的数量并计算 unique request_group_id

结果应该如下:

+---------+--------------------+---------------------+--------------------------------+
|  name   |cnt(request_type(X))| cnt(request_type(Y))| cnt(distinct(request_group_id))|
+---------+--------------------+---------------------+--------------------------------+
|Michael  |          2         |         1           |      3                         |
|Joe      |          0         |         1           |      1                         |
|Sam      |          2         |         0           |      2                         |
|John     |          1         |         0           |      1                         |
|Elizabeth|          0         |         1           |      1                         |
+---------+--------------------+---------------------+--------------------------------+

到目前为止我所做的:(有助于导出前两列)

msgDataFrame.select(NAME, REQUEST_TYPE)
            .groupBy(NAME)
            .pivot(REQUEST_TYPE, Lists.newArrayList(X, Y))
            .agg(functions.count(REQUEST_TYPE))
            .show();

如何计算此 select 中不同的 request_group_id?可以在里面做吗?

我认为只有通过两个数据集连接才有可能(我当前的结果+不同的聚合request_group_id

示例 "countDistinct"("countDistinct" 未在 window 上工作,替换为 "size","collect_set"):

val groupIdWindow = Window.partitionBy("name")
df.select($"name", $"request_type",
      size(collect_set("request_group_id").over(groupIdWindow)).alias("countDistinct"))
  .groupBy("name", "countDistinct")
  .pivot($"request_type", Seq("X", "Y"))
  .agg(count("request_type"))
  .show(false)