获取 pyspark 数据框中过去 24 小时内重复值的计数
Get count of the value repeated in the last 24 hours in pyspark dataframe
请帮助我处理这个 pyspark 代码。我需要计算一个 ip 在过去 24 小时内出现的次数,不包括该实例。 ip 第一次出现在数据中时,count_last24hrs 列应 return 值为 0。从下一次开始,代码应计算同一 ip 在过去 24 小时内出现的次数该时间戳不包括该实例。
我尝试使用 window 函数,但没有得到想要的结果。
count_last24hrs 是结果应出现的列名。
使用这个数据框作为df
列名称为 (datetime, ip, count_last24hrs)
(10/05/2022 10:14:00 上午,1.1.1.1,0)
(10/05/2022 10:16:00 上午,1.1.1.1,1)
(10/05/2022 10:18:00 上午,2.2.2.2,0)
(10/05/2022 10:21:00 上午,1.1.1.1,2)
snapshot of the data using
我正在尝试的代码
#根据天数计算秒数的函数
天 = lambda i: i * 86400
#create window 通过将时间戳转换为长(秒数)
w = (Window.orderBy(F.col("datetime").cast('long')).rangeBetween(-days(1), 0))
#使用 collect_set 和大小函数对 window
执行 countDistinct
df_new= df.withColumn('count_last24hrs', F.size(F.collect_set("ip").over(w)) )
result = (df
.withColumn('ip_count', F.expr("count(ip_address) over (partition by ip_address order by datetimecol range between interval 24 hours preceding and current row)"))
.withColumn('ip_count',when(f.col('ip')==0,0).otherwise(f.col('ip')-1) ).
select('datetimecol', 'ip_address','ip_count')
第一个withColumn语句选取最近24小时内的数据,将数据按时间排序的“ip_address”分区,求累计和
第二个 withColumn 使计数减 1。因此第一个计数是 0 而不是 1。
结果:
datetimecol
ip
ip_last24_hrs
2022-05-10 10:14:00
1.1.1.1
0
2022-05-10 10:16:00
1.1.1.1
1
2022-05-10 10:18:00
2.2.2.2
0
2022-05-10 10:21:00
1.1.1.1
2
请帮助我处理这个 pyspark 代码。我需要计算一个 ip 在过去 24 小时内出现的次数,不包括该实例。 ip 第一次出现在数据中时,count_last24hrs 列应 return 值为 0。从下一次开始,代码应计算同一 ip 在过去 24 小时内出现的次数该时间戳不包括该实例。
我尝试使用 window 函数,但没有得到想要的结果。
count_last24hrs 是结果应出现的列名。
使用这个数据框作为df
列名称为 (datetime, ip, count_last24hrs)
(10/05/2022 10:14:00 上午,1.1.1.1,0)
(10/05/2022 10:16:00 上午,1.1.1.1,1)
(10/05/2022 10:18:00 上午,2.2.2.2,0)
(10/05/2022 10:21:00 上午,1.1.1.1,2)
snapshot of the data using
我正在尝试的代码
#根据天数计算秒数的函数
天 = lambda i: i * 86400
#create window 通过将时间戳转换为长(秒数)
w = (Window.orderBy(F.col("datetime").cast('long')).rangeBetween(-days(1), 0))
#使用 collect_set 和大小函数对 window
执行 countDistinctdf_new= df.withColumn('count_last24hrs', F.size(F.collect_set("ip").over(w)) )
result = (df
.withColumn('ip_count', F.expr("count(ip_address) over (partition by ip_address order by datetimecol range between interval 24 hours preceding and current row)"))
.withColumn('ip_count',when(f.col('ip')==0,0).otherwise(f.col('ip')-1) ).
select('datetimecol', 'ip_address','ip_count')
第一个withColumn语句选取最近24小时内的数据,将数据按时间排序的“ip_address”分区,求累计和
第二个 withColumn 使计数减 1。因此第一个计数是 0 而不是 1。
结果:
datetimecol | ip | ip_last24_hrs |
---|---|---|
2022-05-10 10:14:00 | 1.1.1.1 | 0 |
2022-05-10 10:16:00 | 1.1.1.1 | 1 |
2022-05-10 10:18:00 | 2.2.2.2 | 0 |
2022-05-10 10:21:00 | 1.1.1.1 | 2 |