我如何计算 pyspark 中每行每个用户在过去 30 天内的记录数?
How can i count number of records in last 30 days for each user per row in pyspark?
我有一个带有示例片段的 pyspark 数据集 below.How 我可以在 recordcounts_in_last_30_days
中填充每个用户在过去 30 天内每行的记录数,如下所示:
date
userid
comment
recordcounts_in_last_30_days
2022-01-15 09:00
1
examplecomment1
0
2022-01-16 09:00
2
examplecomment2
0
2022-01-25 09:00
1
examplecomment3
1
2022-01-28 09:00
2
examplecomment3
1
2022-02-26 09:00
2
examplecomment3
1
2022-03-25 09:00
1
examplecomment4
0
有不明白的地方请在评论区留言
在以 rangeBetween
为界的 window 上使用 count
,如下所示:
from pyspark.sql import functions as F, Window
w = (Window.partitionBy("userid").orderBy(F.col("date").cast("long"))
.rangeBetween(-30 * 86400, -1) # 86400 = number of second in a day
)
result = (df.withColumn("date", F.to_timestamp("date", "yyyy-MM-dd HH:mm"))
.withColumn("recordcounts_in_last_30_days", F.count("*").over(w))
)
result.show()
#+-------------------+------+---------------+----------------------------+
#| date|userid| comment|recordcounts_in_last_30_days|
#+-------------------+------+---------------+----------------------------+
#|2022-01-15 09:00:00| 1|examplecomment1| 0|
#|2022-01-25 09:00:00| 1|examplecomment3| 1|
#|2022-03-25 09:00:00| 1|examplecomment4| 0|
#|2022-01-16 09:00:00| 2|examplecomment2| 0|
#|2022-01-28 09:00:00| 2|examplecomment3| 1|
#|2022-02-26 09:00:00| 2|examplecomment3| 1|
#+-------------------+------+---------------+----------------------------+
我有一个带有示例片段的 pyspark 数据集 below.How 我可以在 recordcounts_in_last_30_days
中填充每个用户在过去 30 天内每行的记录数,如下所示:
date | userid | comment | recordcounts_in_last_30_days |
---|---|---|---|
2022-01-15 09:00 | 1 | examplecomment1 | 0 |
2022-01-16 09:00 | 2 | examplecomment2 | 0 |
2022-01-25 09:00 | 1 | examplecomment3 | 1 |
2022-01-28 09:00 | 2 | examplecomment3 | 1 |
2022-02-26 09:00 | 2 | examplecomment3 | 1 |
2022-03-25 09:00 | 1 | examplecomment4 | 0 |
有不明白的地方请在评论区留言
在以 rangeBetween
为界的 window 上使用 count
,如下所示:
from pyspark.sql import functions as F, Window
w = (Window.partitionBy("userid").orderBy(F.col("date").cast("long"))
.rangeBetween(-30 * 86400, -1) # 86400 = number of second in a day
)
result = (df.withColumn("date", F.to_timestamp("date", "yyyy-MM-dd HH:mm"))
.withColumn("recordcounts_in_last_30_days", F.count("*").over(w))
)
result.show()
#+-------------------+------+---------------+----------------------------+
#| date|userid| comment|recordcounts_in_last_30_days|
#+-------------------+------+---------------+----------------------------+
#|2022-01-15 09:00:00| 1|examplecomment1| 0|
#|2022-01-25 09:00:00| 1|examplecomment3| 1|
#|2022-03-25 09:00:00| 1|examplecomment4| 0|
#|2022-01-16 09:00:00| 2|examplecomment2| 0|
#|2022-01-28 09:00:00| 2|examplecomment3| 1|
#|2022-02-26 09:00:00| 2|examplecomment3| 1|
#+-------------------+------+---------------+----------------------------+