获取 pyspark 数据框中过去 24 小时内重复值的计数

Get count of the value repeated in the last 24 hours in pyspark dataframe

请帮助我处理这个 pyspark 代码。我需要计算一个 ip 在过去 24 小时内出现的次数,不包括该实例。 ip 第一次出现在数据中时,count_last24hrs 列应 return 值为 0。从下一次开始,代码应计算同一 ip 在过去 24 小时内出现的次数该时间戳不包括该实例。

我尝试使用 window 函数,但没有得到想要的结果。

count_last24hrs 是结果应出现的列名。

使用这个数据框作为df

列名称为 (datetime, ip, count_last24hrs)

(10/05/2022 10:14:00 上午,1.1.1.1,0)

(10/05/2022 10:16:00 上午,1.1.1.1,1)

(10/05/2022 10:18:00 上午,2.2.2.2,0)

(10/05/2022 10:21:00 上午,1.1.1.1,2)

snapshot of the data using

我正在尝试的代码

#根据天数计算秒数的函数

天 = lambda i: i * 86400

#create window 通过将时间戳转换为长(秒数)

w = (Window.orderBy(F.col("datetime").cast('long')).rangeBetween(-days(1), 0))

#使用 collect_set 和大小函数对 window

执行 countDistinct

df_new= df.withColumn('count_last24hrs', F.size(F.collect_set("ip").over(w)) )

result = (df
.withColumn('ip_count', F.expr("count(ip_address) over (partition by ip_address order by datetimecol range between interval 24 hours preceding and current row)"))
.withColumn('ip_count',when(f.col('ip')==0,0).otherwise(f.col('ip')-1) ).
select('datetimecol', 'ip_address','ip_count')

第一个withColumn语句选取最近24小时内的数据,将数据按时间排序的“ip_address”分区,求累计和

第二个 withColumn 使计数减 1。因此第一个计数是 0 而不是 1。

结果:

datetimecol ip ip_last24_hrs
2022-05-10 10:14:00 1.1.1.1 0
2022-05-10 10:16:00 1.1.1.1 1
2022-05-10 10:18:00 2.2.2.2 0
2022-05-10 10:21:00 1.1.1.1 2