如何查找和计算日期时间在 1 秒内发生的行?
How to find and count rows with datetime that happend within 1 second?
假设我有带有用户 ip 和时间戳的请求日志。
如何找到在 1 秒间隔内发出超过或等于 5 个请求的用户?
user_ip time
---------------------------
user_ip1 16:11:10.56
user_ip1 16:11:10.67
user_ip1 16:11:10.87
user_ip2 16:11:10.92
user_ip2 16:11:10.97
user_ip1 16:11:11.15
user_ip1 16:11:11.20
user_ip1 16:11:11.30
user_ip2 16:11:12.13
user_ip2 16:11:13.50
user_ip2 16:11:13.80
user_ip1 在 16:11:10 中发出了 3 个请求,在 16:11:11 中发出了 3 个请求,即从 10.56 开始到 11.30 结束的 0.74 秒范围内的 6 个请求。虽然 user_ip2 请求分布在多秒内。
测试数据集的结果应该是单user_ip1行
由于 windowing 是在任意 window 开始和结束时间完成的,我们可以使用 Spark window
来识别新的 1 second
间隔的开始。
我们使用lag
计算上一行和当前行之间的时间差,如果这个差异超过1秒,则表示当前行不能在window包含前一行,因此我们创建一个新的 window 来包含这一行。
确定 windows 行所属后,我们可以分组,然后通过 user_ip, window
应用 count
聚合和过滤来找到合适的用户 IP。
from datetime import datetime
from pyspark.sql import Window
from pyspark.sql import functions as F
max_rps = 5
data = [("user_ip1", datetime.strptime("16:11:10.56", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:10.67", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:10.87", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:10.92", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:10.97", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:11.15", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:11.20", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:11.30", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:12.13", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:13.50", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:13.80", "%H:%M:%S.%f"))]
window_spec = Window.partitionBy("user_ip").orderBy("time")
df.withColumn("diff", F.col("time") - F.coalesce(F.lag("time").over(window_spec), F.col("time")))\
.withColumn("rc", F.when(F.col("diff") <= F.expr("INTERVAL 1 SECOND"), 0).otherwise(1))\
.withColumn("window", F.sum("rc").over(window_spec))\
.groupBy("user_ip", "window")\
.agg(F.count('time').alias('rps'))\
.filter(F.col("rps") > max_rps)\
.show(200, False)
输出
+--------+------+---+
|user_ip |window|rps|
+--------+------+---+
|user_ip1|0 |6 |
+--------+------+---+
一个非常简单的技术:使用 lag(5) 获取当前行之前 5 的行。如果这样的行不存在 - 返回 null。
然后简单地计算时间差异和过滤器。部分代码如下:
data = [
("user_ip1", "16:11:10.56"),
("user_ip1", "16:11:10.67"),
("user_ip1", "16:11:10.87"),
("user_ip2", "16:11:10.92"),
("user_ip2","16:11:10.97"),
("user_ip1", "16:11:11.15"),
("user_ip1", "16:11:11.20"),
("user_ip1", "16:11:11.30"),
("user_ip2", "16:11:12.13"),
("user_ip2", "16:11:13.50"),
("user_ip2", "16:11:13.80")
]
columns = ["ip", "time"]
spark.createDataFrame(data).toDF(*columns).createOrReplaceTempView("data")
spark.sql("""
select ip,
time,
lag(time, 5) over (partition by ip order by time asc) five_occurences_ago
from data
""").show()
+--------+-----------+-------------------+
| ip| time|five_occurences_ago|
+--------+-----------+-------------------+
|user_ip1|16:11:10.56| null|
|user_ip1|16:11:10.67| null|
|user_ip1|16:11:10.87| null|
|user_ip1|16:11:11.15| null|
|user_ip1|16:11:11.20| null|
|user_ip1|16:11:11.30| 16:11:10.56|
|user_ip2|16:11:10.92| null|
|user_ip2|16:11:10.97| null|
|user_ip2|16:11:12.13| null|
|user_ip2|16:11:13.50| null|
|user_ip2|16:11:13.80| null|
+--------+-----------+-------------------+
感谢@Vitaly,我最终找到了这个解决方案
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, FloatType
from datetime import datetime
from pyspark.sql.functions import udf
data = [("user_ip1", datetime.strptime("2020-12-12 16:11:10.56", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:10.67", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:10.87", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:10.92", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:10.97", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:11.15", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:11.20", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:11.30", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:12.13", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.50", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
]
streaming_structure = StructType([
StructField('user_ip', StringType(), True),
StructField('ts', TimestampType(), True),
])
columns = ["user_ip", "ts"]
views_df = spark.createDataFrame(data, schema=streaming_structure)
def get_ms_diff(start, end):
if start is None or end is None:
return None
return end.timestamp() - start.timestamp()
get_ms_diff_udf = udf(get_ms_diff, FloatType())
window = Window.partitionBy("user_ip").orderBy(asc("ts"))
suspicious_ips = views_df \
.withColumn("five_ago", lag("ts", 5).over(window)) \
.withColumn("diff", get_ms_diff_udf(col("five_ago"), col("ts"))) \
.filter("diff < 1") \
.select("user_ip", "ts", "five_ago", "diff") \
.distinct()
假设我有带有用户 ip 和时间戳的请求日志。 如何找到在 1 秒间隔内发出超过或等于 5 个请求的用户?
user_ip time
---------------------------
user_ip1 16:11:10.56
user_ip1 16:11:10.67
user_ip1 16:11:10.87
user_ip2 16:11:10.92
user_ip2 16:11:10.97
user_ip1 16:11:11.15
user_ip1 16:11:11.20
user_ip1 16:11:11.30
user_ip2 16:11:12.13
user_ip2 16:11:13.50
user_ip2 16:11:13.80
user_ip1 在 16:11:10 中发出了 3 个请求,在 16:11:11 中发出了 3 个请求,即从 10.56 开始到 11.30 结束的 0.74 秒范围内的 6 个请求。虽然 user_ip2 请求分布在多秒内。
测试数据集的结果应该是单user_ip1行
由于 windowing 是在任意 window 开始和结束时间完成的,我们可以使用 Spark window
来识别新的 1 second
间隔的开始。
我们使用lag
计算上一行和当前行之间的时间差,如果这个差异超过1秒,则表示当前行不能在window包含前一行,因此我们创建一个新的 window 来包含这一行。
确定 windows 行所属后,我们可以分组,然后通过 user_ip, window
应用 count
聚合和过滤来找到合适的用户 IP。
from datetime import datetime
from pyspark.sql import Window
from pyspark.sql import functions as F
max_rps = 5
data = [("user_ip1", datetime.strptime("16:11:10.56", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:10.67", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:10.87", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:10.92", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:10.97", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:11.15", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:11.20", "%H:%M:%S.%f")),
("user_ip1", datetime.strptime("16:11:11.30", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:12.13", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:13.50", "%H:%M:%S.%f")),
("user_ip2", datetime.strptime("16:11:13.80", "%H:%M:%S.%f"))]
window_spec = Window.partitionBy("user_ip").orderBy("time")
df.withColumn("diff", F.col("time") - F.coalesce(F.lag("time").over(window_spec), F.col("time")))\
.withColumn("rc", F.when(F.col("diff") <= F.expr("INTERVAL 1 SECOND"), 0).otherwise(1))\
.withColumn("window", F.sum("rc").over(window_spec))\
.groupBy("user_ip", "window")\
.agg(F.count('time').alias('rps'))\
.filter(F.col("rps") > max_rps)\
.show(200, False)
输出
+--------+------+---+
|user_ip |window|rps|
+--------+------+---+
|user_ip1|0 |6 |
+--------+------+---+
一个非常简单的技术:使用 lag(5) 获取当前行之前 5 的行。如果这样的行不存在 - 返回 null。 然后简单地计算时间差异和过滤器。部分代码如下:
data = [
("user_ip1", "16:11:10.56"),
("user_ip1", "16:11:10.67"),
("user_ip1", "16:11:10.87"),
("user_ip2", "16:11:10.92"),
("user_ip2","16:11:10.97"),
("user_ip1", "16:11:11.15"),
("user_ip1", "16:11:11.20"),
("user_ip1", "16:11:11.30"),
("user_ip2", "16:11:12.13"),
("user_ip2", "16:11:13.50"),
("user_ip2", "16:11:13.80")
]
columns = ["ip", "time"]
spark.createDataFrame(data).toDF(*columns).createOrReplaceTempView("data")
spark.sql("""
select ip,
time,
lag(time, 5) over (partition by ip order by time asc) five_occurences_ago
from data
""").show()
+--------+-----------+-------------------+
| ip| time|five_occurences_ago|
+--------+-----------+-------------------+
|user_ip1|16:11:10.56| null|
|user_ip1|16:11:10.67| null|
|user_ip1|16:11:10.87| null|
|user_ip1|16:11:11.15| null|
|user_ip1|16:11:11.20| null|
|user_ip1|16:11:11.30| 16:11:10.56|
|user_ip2|16:11:10.92| null|
|user_ip2|16:11:10.97| null|
|user_ip2|16:11:12.13| null|
|user_ip2|16:11:13.50| null|
|user_ip2|16:11:13.80| null|
+--------+-----------+-------------------+
感谢@Vitaly,我最终找到了这个解决方案
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, StringType, FloatType
from datetime import datetime
from pyspark.sql.functions import udf
data = [("user_ip1", datetime.strptime("2020-12-12 16:11:10.56", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:10.67", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:10.87", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:10.92", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:10.97", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:11.15", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:11.20", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip1", datetime.strptime("2020-12-12 16:11:11.30", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:12.13", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.50", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
("user_ip2", datetime.strptime("2020-12-12 16:11:13.80", "%Y-%m-%d %H:%M:%S.%f")),
]
streaming_structure = StructType([
StructField('user_ip', StringType(), True),
StructField('ts', TimestampType(), True),
])
columns = ["user_ip", "ts"]
views_df = spark.createDataFrame(data, schema=streaming_structure)
def get_ms_diff(start, end):
if start is None or end is None:
return None
return end.timestamp() - start.timestamp()
get_ms_diff_udf = udf(get_ms_diff, FloatType())
window = Window.partitionBy("user_ip").orderBy(asc("ts"))
suspicious_ips = views_df \
.withColumn("five_ago", lag("ts", 5).over(window)) \
.withColumn("diff", get_ms_diff_udf(col("five_ago"), col("ts"))) \
.filter("diff < 1") \
.select("user_ip", "ts", "five_ago", "diff") \
.distinct()