Pyspark DataFrame 按不属于组的项目分组

Pyspark DataFrame Grouping by item that doesn't belong to the group

我是 pyspark 的新手,我遇到了一种情况,你能帮我以如下方式获得结果吗:

customer_id item_id amount
1 tx1 15
1 tx2 10
1 tx3 14
2 tx1 15
2 tx4 12
3 tx2 10
2 tx6 43
4 tx4 12
5 tx8 76
6 tx6 43
5 tx6 43
3 tx6 43

并且想知道每个 item:

所以最后的 table 看起来像:

item_id target_cust taget_amount
tx1 4 227
tx2 4 201
tx3 5 297
tx4 4 --
tx6 3 --
tx8 5 --

请帮助我获得类似的输出,方向上的任何建议都很好

首先按 customer_id 分组并获取已购买 item_id 的列表以及关联的 amount,如下所示:

import pyspark.sql.functions as F

items_by_customer_df = df.groupBy("customer_id").agg(
    F.collect_set("item_id").alias("items"),
    F.sum("amount").alias("target_amount")

)

items_by_customer_df.show()
#+-----------+---------------+-------------+
#|customer_id|items          |target_amount|
#+-----------+---------------+-------------+
#|1          |[tx1, tx2, tx3]|39           |
#|2          |[tx1, tx6, tx4]|70           |
#|3          |[tx2, tx6]     |53           |
#|5          |[tx6, tx8]     |119          |
#|4          |[tx4]          |12           |
#|6          |[tx6]          |43           |
#+-----------+---------------+-------------+

现在,使用 array_contains 的否定作为条件,使用与原始 df 不同的 item_id 加入这个分组数据框,然后按 item_id 分组并进行聚合 count(customer_id) + sum(amount):

result = df.select("item_id").distinct().join(
    items_by_customer_df,
    ~F.array_contains("items", F.col("item_id"))
).groupBy("item_id").agg(
    F.count("customer_id").alias("target_cust"),
    F.sum("target_amount").alias("target_amount")
)

result.show()
#+-------+-----------+-------------+
#|item_id|target_cust|target_amount|
#+-------+-----------+-------------+
#|    tx2|          4|          244|
#|    tx4|          4|          254|
#|    tx1|          4|          227|
#|    tx8|          5|          217|
#|    tx3|          5|          297|
#|    tx6|          2|           51|
#+-------+-----------+-------------+