如何查找和计算日期时间在 1 秒内发生的行?

How to find and count rows with datetime that happend within 1 second?

假设我有带有用户 ip 和时间戳的请求日志。 如何找到在 1 秒间隔内发出超过或等于 5 个请求的用户?

user_ip     time
---------------------------
user_ip1    16:11:10.56
user_ip1    16:11:10.67
user_ip1    16:11:10.87
user_ip2    16:11:10.92
user_ip2    16:11:10.97
user_ip1    16:11:11.15
user_ip1    16:11:11.20
user_ip1    16:11:11.30
user_ip2    16:11:12.13
user_ip2    16:11:13.50
user_ip2    16:11:13.80

user_ip1 在 16:11:10 中发出了 3 个请求,在 16:11:11 中发出了 3 个请求,即从 10.56 开始到 11.30 结束的 0.74 秒范围内的 6 个请求。虽然 user_ip2 请求分布在多秒内。

测试数据集的结果应该是单user_ip1行

由于 windowing 是在任意 window 开始和结束时间完成的,我们可以使用 Spark window 来识别新的 1 second 间隔的开始。

我们使用lag计算上一行和当前行之间的时间差,如果这个差异超过1秒,则表示当前行不能在window包含前一行,因此我们创建一个新的 window 来包含这一行。

确定 windows 行所属后,我们可以分组,然后通过 user_ip, window 应用 count 聚合和过滤来找到合适的用户 IP。

from datetime import datetime
from pyspark.sql import Window
from pyspark.sql import functions as F

max_rps = 5

data = [("user_ip1", datetime.strptime("16:11:10.56", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:10.67", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:10.87", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:10.92", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:10.97", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:11.15", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:11.20", "%H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("16:11:11.30", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:12.13", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:13.50", "%H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("16:11:13.80", "%H:%M:%S.%f"))]

window_spec = Window.partitionBy("user_ip").orderBy("time")

df.withColumn("diff", F.col("time") - F.coalesce(F.lag("time").over(window_spec), F.col("time")))\
.withColumn("rc", F.when(F.col("diff") <= F.expr("INTERVAL 1 SECOND"), 0).otherwise(1))\
.withColumn("window", F.sum("rc").over(window_spec))\
.groupBy("user_ip", "window")\
.agg(F.count('time').alias('rps'))\
.filter(F.col("rps") > max_rps)\
.show(200, False)

输出

+--------+------+---+
|user_ip |window|rps|
+--------+------+---+
|user_ip1|0     |6  |
+--------+------+---+

一个非常简单的技术:使用 lag(5) 获取当前行之前 5 的行。如果这样的行不存在 - 返回 null。 然后简单地计算时间差异和过滤器。部分代码如下:

data = [
    ("user_ip1", "16:11:10.56"),
    ("user_ip1", "16:11:10.67"),
    ("user_ip1", "16:11:10.87"),
    ("user_ip2", "16:11:10.92"),
    ("user_ip2","16:11:10.97"),
    ("user_ip1", "16:11:11.15"),
    ("user_ip1", "16:11:11.20"),
    ("user_ip1", "16:11:11.30"),
    ("user_ip2", "16:11:12.13"),
    ("user_ip2", "16:11:13.50"),
    ("user_ip2", "16:11:13.80")
    ]


columns = ["ip", "time"]

spark.createDataFrame(data).toDF(*columns).createOrReplaceTempView("data")

spark.sql("""
  select ip,
        time,
        lag(time, 5) over (partition by ip order by time asc) five_occurences_ago
  from data
""").show()

+--------+-----------+-------------------+
|      ip|       time|five_occurences_ago|
+--------+-----------+-------------------+
|user_ip1|16:11:10.56|               null|
|user_ip1|16:11:10.67|               null|
|user_ip1|16:11:10.87|               null|
|user_ip1|16:11:11.15|               null|
|user_ip1|16:11:11.20|               null|
|user_ip1|16:11:11.30|        16:11:10.56|
|user_ip2|16:11:10.92|               null|
|user_ip2|16:11:10.97|               null|
|user_ip2|16:11:12.13|               null|
|user_ip2|16:11:13.50|               null|
|user_ip2|16:11:13.80|               null|
+--------+-----------+-------------------+

感谢@Vitaly,我最终找到了这个解决方案

from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, FloatType
from datetime import datetime
from pyspark.sql.functions import udf

data = [("user_ip1", datetime.strptime("2020-12-12 16:11:10.56", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:10.67", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:10.87", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:10.92", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:10.97", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:11.15", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:11.20", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip1", datetime.strptime("2020-12-12 16:11:11.30", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:12.13", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.50", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
 ("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
]

streaming_structure = StructType([
    StructField('user_ip', StringType(), True),
    StructField('ts', TimestampType(), True),
])


columns = ["user_ip", "ts"]

views_df = spark.createDataFrame(data, schema=streaming_structure)


def get_ms_diff(start, end):
    if start is None or end is None:
        return None
    
    return end.timestamp() - start.timestamp()

get_ms_diff_udf = udf(get_ms_diff, FloatType())

window = Window.partitionBy("user_ip").orderBy(asc("ts"))
suspicious_ips = views_df \
    .withColumn("five_ago", lag("ts", 5).over(window)) \
    .withColumn("diff", get_ms_diff_udf(col("five_ago"), col("ts"))) \
    .filter("diff < 1") \
    .select("user_ip", "ts", "five_ago", "diff") \
    .distinct()