Pyspark DataFrame 按不属于组的项目分组
Pyspark DataFrame Grouping by item that doesn't belong to the group
我是 pyspark 的新手,我遇到了一种情况,你能帮我以如下方式获得结果吗:
customer_id
item_id
amount
1
tx1
15
1
tx2
10
1
tx3
14
2
tx1
15
2
tx4
12
3
tx2
10
2
tx6
43
4
tx4
12
5
tx8
76
6
tx6
43
5
tx6
43
3
tx6
43
并且想知道每个 item
:
- 没有购买此商品的
customers
人 item
items
的 amount
的总和,而不是此 item
的 customers
。
所以最后的 table 看起来像:
item_id
target_cust
taget_amount
tx1
4
227
tx2
4
201
tx3
5
297
tx4
4
--
tx6
3
--
tx8
5
--
请帮助我获得类似的输出,方向上的任何建议都很好
首先按 customer_id
分组并获取已购买 item_id
的列表以及关联的 amount
,如下所示:
import pyspark.sql.functions as F
items_by_customer_df = df.groupBy("customer_id").agg(
F.collect_set("item_id").alias("items"),
F.sum("amount").alias("target_amount")
)
items_by_customer_df.show()
#+-----------+---------------+-------------+
#|customer_id|items |target_amount|
#+-----------+---------------+-------------+
#|1 |[tx1, tx2, tx3]|39 |
#|2 |[tx1, tx6, tx4]|70 |
#|3 |[tx2, tx6] |53 |
#|5 |[tx6, tx8] |119 |
#|4 |[tx4] |12 |
#|6 |[tx6] |43 |
#+-----------+---------------+-------------+
现在,使用 array_contains
的否定作为条件,使用与原始 df
不同的 item_id
加入这个分组数据框,然后按 item_id
分组并进行聚合 count(customer_id)
+ sum(amount)
:
result = df.select("item_id").distinct().join(
items_by_customer_df,
~F.array_contains("items", F.col("item_id"))
).groupBy("item_id").agg(
F.count("customer_id").alias("target_cust"),
F.sum("target_amount").alias("target_amount")
)
result.show()
#+-------+-----------+-------------+
#|item_id|target_cust|target_amount|
#+-------+-----------+-------------+
#| tx2| 4| 244|
#| tx4| 4| 254|
#| tx1| 4| 227|
#| tx8| 5| 217|
#| tx3| 5| 297|
#| tx6| 2| 51|
#+-------+-----------+-------------+
我是 pyspark 的新手,我遇到了一种情况,你能帮我以如下方式获得结果吗:
customer_id | item_id | amount |
---|---|---|
1 | tx1 | 15 |
1 | tx2 | 10 |
1 | tx3 | 14 |
2 | tx1 | 15 |
2 | tx4 | 12 |
3 | tx2 | 10 |
2 | tx6 | 43 |
4 | tx4 | 12 |
5 | tx8 | 76 |
6 | tx6 | 43 |
5 | tx6 | 43 |
3 | tx6 | 43 |
并且想知道每个 item
:
- 没有购买此商品的
customers
人item
items
的amount
的总和,而不是此item
的customers
。
所以最后的 table 看起来像:
item_id | target_cust | taget_amount |
---|---|---|
tx1 | 4 | 227 |
tx2 | 4 | 201 |
tx3 | 5 | 297 |
tx4 | 4 | -- |
tx6 | 3 | -- |
tx8 | 5 | -- |
请帮助我获得类似的输出,方向上的任何建议都很好
首先按 customer_id
分组并获取已购买 item_id
的列表以及关联的 amount
,如下所示:
import pyspark.sql.functions as F
items_by_customer_df = df.groupBy("customer_id").agg(
F.collect_set("item_id").alias("items"),
F.sum("amount").alias("target_amount")
)
items_by_customer_df.show()
#+-----------+---------------+-------------+
#|customer_id|items |target_amount|
#+-----------+---------------+-------------+
#|1 |[tx1, tx2, tx3]|39 |
#|2 |[tx1, tx6, tx4]|70 |
#|3 |[tx2, tx6] |53 |
#|5 |[tx6, tx8] |119 |
#|4 |[tx4] |12 |
#|6 |[tx6] |43 |
#+-----------+---------------+-------------+
现在,使用 array_contains
的否定作为条件,使用与原始 df
不同的 item_id
加入这个分组数据框,然后按 item_id
分组并进行聚合 count(customer_id)
+ sum(amount)
:
result = df.select("item_id").distinct().join(
items_by_customer_df,
~F.array_contains("items", F.col("item_id"))
).groupBy("item_id").agg(
F.count("customer_id").alias("target_cust"),
F.sum("target_amount").alias("target_amount")
)
result.show()
#+-------+-----------+-------------+
#|item_id|target_cust|target_amount|
#+-------+-----------+-------------+
#| tx2| 4| 244|
#| tx4| 4| 254|
#| tx1| 4| 227|
#| tx8| 5| 217|
#| tx3| 5| 297|
#| tx6| 2| 51|
#+-------+-----------+-------------+