Pyspark 计数任何 ID 的任何滑动 window
Pyspark count on any sliding window for any ID
我有一个随时间变化的客户数字访问数据框,格式为:
|cust_id|datetime|
|1|2020-08-15 15:20|
|1|2020-08-15 16:20|
|1|2020-08-17 12:20|
|1|2020-08-19 14:20|
|1|2020-08-23 09:20|
|2|2020-08-24 08:00|
我想挑选出 强 信号,例如:5 天内至少访问 3 次的客户。
我最初的想法是,我们必须为每个客户计算 ALL 滑动 window。
在这个例子中,我们以 cust1 为例:
5 天 window 从 2020-08-15 开始,到 2020-08-19 结束,总访问量为 4
5 天 window 从 2020-08-16 开始,到 2020-08-20 结束,总访问量为 2
5 天 window 从 2020-08-17 开始,到 2020-08-21 结束,总访问量为 2
等等
所有滑动的最大计数 window 是 4。因此 cust1 符合条件“在 5 天内至少访问了 3 次”
这似乎是一个代价高昂的操作。
您将如何有效地实施它?欢迎任何其他想法。
您可以将datetime
列转换为long
并在rangeBetween()函数中传入相当于5天的秒数。
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = df.withColumn("date_long", to_date(substring(col("datetime"),0,10), "yyyy-MM-dd"))\
.withColumn("date_long", unix_timestamp('date_long', 'yyyy-MM-dd'))
days = lambda i: i * 86400
w = (Window.partitionBy('cust_id').orderBy("date_long").rangeBetween(0,days(5)))
df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long').show()
+-------+----------------+-----------+
|cust_id| datetime|5_day_visit|
+-------+----------------+-----------+
| 1|2020-08-15 15:20| 4|
| 1|2020-08-15 16:20| 4|
| 1|2020-08-17 12:20| 2|
| 1|2020-08-19 14:20| 2|
| 1|2020-08-23 09:20| 1|
| 2|2020-08-24 08:00| 1|
+-------+----------------+-----------+
要获得每位客户 5 天的最大访问次数,您可以执行以下操作:
df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long')\
.groupBy('cust_id').agg(F.max('5_day_visit').alias('max_5_day_visits')).show()
+-------+----------------+
|cust_id|max_5_day_visits|
+-------+----------------+
| 1| 4|
| 2| 1|
+-------+----------------+
我有一个随时间变化的客户数字访问数据框,格式为:
|cust_id|datetime|
|1|2020-08-15 15:20|
|1|2020-08-15 16:20|
|1|2020-08-17 12:20|
|1|2020-08-19 14:20|
|1|2020-08-23 09:20|
|2|2020-08-24 08:00|
我想挑选出 强 信号,例如:5 天内至少访问 3 次的客户。
我最初的想法是,我们必须为每个客户计算 ALL 滑动 window。
在这个例子中,我们以 cust1 为例:
5 天 window 从 2020-08-15 开始,到 2020-08-19 结束,总访问量为 4
5 天 window 从 2020-08-16 开始,到 2020-08-20 结束,总访问量为 2
5 天 window 从 2020-08-17 开始,到 2020-08-21 结束,总访问量为 2
等等
所有滑动的最大计数 window 是 4。因此 cust1 符合条件“在 5 天内至少访问了 3 次”
这似乎是一个代价高昂的操作。
您将如何有效地实施它?欢迎任何其他想法。
您可以将datetime
列转换为long
并在rangeBetween()函数中传入相当于5天的秒数。
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = df.withColumn("date_long", to_date(substring(col("datetime"),0,10), "yyyy-MM-dd"))\
.withColumn("date_long", unix_timestamp('date_long', 'yyyy-MM-dd'))
days = lambda i: i * 86400
w = (Window.partitionBy('cust_id').orderBy("date_long").rangeBetween(0,days(5)))
df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long').show()
+-------+----------------+-----------+
|cust_id| datetime|5_day_visit|
+-------+----------------+-----------+
| 1|2020-08-15 15:20| 4|
| 1|2020-08-15 16:20| 4|
| 1|2020-08-17 12:20| 2|
| 1|2020-08-19 14:20| 2|
| 1|2020-08-23 09:20| 1|
| 2|2020-08-24 08:00| 1|
+-------+----------------+-----------+
要获得每位客户 5 天的最大访问次数,您可以执行以下操作:
df.withColumn('5_day_visit', F.count("*").over(w)).drop('date_long')\
.groupBy('cust_id').agg(F.max('5_day_visit').alias('max_5_day_visits')).show()
+-------+----------------+
|cust_id|max_5_day_visits|
+-------+----------------+
| 1| 4|
| 2| 1|
+-------+----------------+