火花检测并提取列值中的模式
spark detect and extract a pattern in column values
我有一个这样的df
import spark.implicits._
import org.apache.spark.sql.functions._
val latenies = Seq(
("start","304875","2021-10-25 21:26:23.486027"),
("start","304875","2021-10-25 21:26:23.486670"),
("end","304875","2021-10-25 21:26:23.487590"),
("start","304875","2021-10-25 21:26:23.509683"),
("end","304875","2021-10-25 21:26:23.509689"),
("end","304875","2021-10-25 21:26:23.510154"),
("start","201345","2021-10-25 21:26:23.510156"),
("end","201345","2021-10-25 21:26:23.510159"),
("start","201345","2021-10-25 21:26:23.510333"),
("start","201345","2021-10-25 21:26:23.510335"),
("end","201345","2021-10-25 21:26:23.513177"),
("start","201345","2021-10-25 21:26:23.513187")
)
val latenies_df = latenies.toDF("Msg_name","Id_num","TimeStamp")
.withColumn("TimeStamp", to_timestamp(col("TimeStamp")))
latenies_df.show(false)
看起来像这样:
+--------+------+--------------------------+
|Msg_name|Id_num|TimeStamp |
+--------+------+--------------------------+
|start |304875|2021-10-25 21:26:23.486027|
|start |304875|2021-10-25 21:26:23.48667 |
|end |304875|2021-10-25 21:26:23.48759 |
|start |304875|2021-10-25 21:26:23.509683|
|end |304875|2021-10-25 21:26:23.509689|
|end |304875|2021-10-25 21:26:23.510154|
|start |201345|2021-10-25 21:26:23.510156|
|end |201345|2021-10-25 21:26:23.510159|
|start |201345|2021-10-25 21:26:23.510333|
|start |201345|2021-10-25 21:26:23.510335|
|end |201345|2021-10-25 21:26:23.513177|
|start |201345|2021-10-25 21:26:23.513187|
+--------+------+--------------------------+
问题:我想在列 Msg_name
中提取特定模式,这总是在 start
具有后续值 end
当按 Id
分区并按 time
排序时。
Msg
可以有多个开始或结束。我只想要 start-end
之间什么都没有。
有了这个模式,我想这样做一个 df:
|patter_name|Timestamp_start |Timestamp_end |Id_num |
| pattern1|2021-10-25 21:26:23.486670|2021-10-25 21:26:23.487590|304875 |
| pattern1|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875 |
| pattern1|2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345 |
| pattern1|2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345 |
我所做的是移动框架,由于 Msg_name
列的性质,这不会给我正确的答案。
val window = org.apache.spark.sql.expressions.Window.partitionBy("Id_num").orderBy("TimeStamp")
val df_only_pattern = latenies_df.withColumn("TimeStamp_start", when($"Msg_name" !== lag($"Msg_name", 1).over(window), lag("TimeStamp", 1).over(window)).otherwise(lit(null)))
.withColumn("latency_time", when($"TimeStamp_start".isNotNull, round((col("TimeStamp").cast("double")-col("TimeStamp_start").cast("double")) * 1e3, 2)).otherwise(lit(null)))
.withColumnRenamed("TimeStamp", "TimeStamp_end")
.withColumn("patter_name", lit("pattern1"))
.na.drop()
df_only_pattern.orderBy("TimeStamp_start").show(false)
这给出了什么:
+--------+------+--------------------------+--------------------------+------------+-----------+
|Msg_name|Id_num|TimeStamp_end |TimeStamp_start |latency_time|patter_name|
+--------+------+--------------------------+--------------------------+------------+-----------+
|end |304875|2021-10-25 21:26:23.48759 |2021-10-25 21:26:23.48667 |0.92 |pattern1 |
|start |304875|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.48759 |22.09 |pattern1 |
|end |304875|2021-10-25 21:26:23.509689|2021-10-25 21:26:23.509683|0.01 |pattern1 |
|end |201345|2021-10-25 21:26:23.510159|2021-10-25 21:26:23.510156|0.0 |pattern1 |
|start |201345|2021-10-25 21:26:23.510333|2021-10-25 21:26:23.510159|0.17 |pattern1 |
|end |201345|2021-10-25 21:26:23.513177|2021-10-25 21:26:23.510335|2.84 |pattern1 |
|start |201345|2021-10-25 21:26:23.513187|2021-10-25 21:26:23.513177|0.01 |pattern1 |
+--------+------+--------------------------+--------------------------+------------+-----------+
我可以用 python pandas 和 groupby 并在组内循环实现想要的 df
,这在 spark 中似乎是不可能的。
可以获取消息“end”,前一行有“start”:
latenies_df
.withColumn("TimeStamp_start",
when(lag($"Msg_name", 1).over(window) === lit("start"), lag($"TimeStamp", 1).over(window))
.otherwise(lit(null).cast(TimestampType))
)
.where($"Msg_name" === lit("end"))
.where($"TimeStamp_start".isNotNull)
.select(
lit("pattern1").alias("patter_name"),
$"TimeStamp_start",
$"TimeStamp".alias("Timestamp_end"),
$"Id_num"
)
结果:
+-----------+--------------------------+--------------------------+------+
|patter_name|TimeStamp_start |Timestamp_end |Id_num|
+-----------+--------------------------+--------------------------+------+
|pattern1 |2021-10-25 21:26:23.48667 |2021-10-25 21:26:23.48759 |304875|
|pattern1 |2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875|
|pattern1 |2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345|
|pattern1 |2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345|
+-----------+--------------------------+--------------------------+------+
我有一个这样的df
import spark.implicits._
import org.apache.spark.sql.functions._
val latenies = Seq(
("start","304875","2021-10-25 21:26:23.486027"),
("start","304875","2021-10-25 21:26:23.486670"),
("end","304875","2021-10-25 21:26:23.487590"),
("start","304875","2021-10-25 21:26:23.509683"),
("end","304875","2021-10-25 21:26:23.509689"),
("end","304875","2021-10-25 21:26:23.510154"),
("start","201345","2021-10-25 21:26:23.510156"),
("end","201345","2021-10-25 21:26:23.510159"),
("start","201345","2021-10-25 21:26:23.510333"),
("start","201345","2021-10-25 21:26:23.510335"),
("end","201345","2021-10-25 21:26:23.513177"),
("start","201345","2021-10-25 21:26:23.513187")
)
val latenies_df = latenies.toDF("Msg_name","Id_num","TimeStamp")
.withColumn("TimeStamp", to_timestamp(col("TimeStamp")))
latenies_df.show(false)
看起来像这样:
+--------+------+--------------------------+
|Msg_name|Id_num|TimeStamp |
+--------+------+--------------------------+
|start |304875|2021-10-25 21:26:23.486027|
|start |304875|2021-10-25 21:26:23.48667 |
|end |304875|2021-10-25 21:26:23.48759 |
|start |304875|2021-10-25 21:26:23.509683|
|end |304875|2021-10-25 21:26:23.509689|
|end |304875|2021-10-25 21:26:23.510154|
|start |201345|2021-10-25 21:26:23.510156|
|end |201345|2021-10-25 21:26:23.510159|
|start |201345|2021-10-25 21:26:23.510333|
|start |201345|2021-10-25 21:26:23.510335|
|end |201345|2021-10-25 21:26:23.513177|
|start |201345|2021-10-25 21:26:23.513187|
+--------+------+--------------------------+
问题:我想在列 Msg_name
中提取特定模式,这总是在 start
具有后续值 end
当按 Id
分区并按 time
排序时。
Msg
可以有多个开始或结束。我只想要 start-end
之间什么都没有。
有了这个模式,我想这样做一个 df:
|patter_name|Timestamp_start |Timestamp_end |Id_num |
| pattern1|2021-10-25 21:26:23.486670|2021-10-25 21:26:23.487590|304875 |
| pattern1|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875 |
| pattern1|2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345 |
| pattern1|2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345 |
我所做的是移动框架,由于 Msg_name
列的性质,这不会给我正确的答案。
val window = org.apache.spark.sql.expressions.Window.partitionBy("Id_num").orderBy("TimeStamp")
val df_only_pattern = latenies_df.withColumn("TimeStamp_start", when($"Msg_name" !== lag($"Msg_name", 1).over(window), lag("TimeStamp", 1).over(window)).otherwise(lit(null)))
.withColumn("latency_time", when($"TimeStamp_start".isNotNull, round((col("TimeStamp").cast("double")-col("TimeStamp_start").cast("double")) * 1e3, 2)).otherwise(lit(null)))
.withColumnRenamed("TimeStamp", "TimeStamp_end")
.withColumn("patter_name", lit("pattern1"))
.na.drop()
df_only_pattern.orderBy("TimeStamp_start").show(false)
这给出了什么:
+--------+------+--------------------------+--------------------------+------------+-----------+
|Msg_name|Id_num|TimeStamp_end |TimeStamp_start |latency_time|patter_name|
+--------+------+--------------------------+--------------------------+------------+-----------+
|end |304875|2021-10-25 21:26:23.48759 |2021-10-25 21:26:23.48667 |0.92 |pattern1 |
|start |304875|2021-10-25 21:26:23.509683|2021-10-25 21:26:23.48759 |22.09 |pattern1 |
|end |304875|2021-10-25 21:26:23.509689|2021-10-25 21:26:23.509683|0.01 |pattern1 |
|end |201345|2021-10-25 21:26:23.510159|2021-10-25 21:26:23.510156|0.0 |pattern1 |
|start |201345|2021-10-25 21:26:23.510333|2021-10-25 21:26:23.510159|0.17 |pattern1 |
|end |201345|2021-10-25 21:26:23.513177|2021-10-25 21:26:23.510335|2.84 |pattern1 |
|start |201345|2021-10-25 21:26:23.513187|2021-10-25 21:26:23.513177|0.01 |pattern1 |
+--------+------+--------------------------+--------------------------+------------+-----------+
我可以用 python pandas 和 groupby 并在组内循环实现想要的 df
,这在 spark 中似乎是不可能的。
可以获取消息“end”,前一行有“start”:
latenies_df
.withColumn("TimeStamp_start",
when(lag($"Msg_name", 1).over(window) === lit("start"), lag($"TimeStamp", 1).over(window))
.otherwise(lit(null).cast(TimestampType))
)
.where($"Msg_name" === lit("end"))
.where($"TimeStamp_start".isNotNull)
.select(
lit("pattern1").alias("patter_name"),
$"TimeStamp_start",
$"TimeStamp".alias("Timestamp_end"),
$"Id_num"
)
结果:
+-----------+--------------------------+--------------------------+------+
|patter_name|TimeStamp_start |Timestamp_end |Id_num|
+-----------+--------------------------+--------------------------+------+
|pattern1 |2021-10-25 21:26:23.48667 |2021-10-25 21:26:23.48759 |304875|
|pattern1 |2021-10-25 21:26:23.509683|2021-10-25 21:26:23.509689|304875|
|pattern1 |2021-10-25 21:26:23.510156|2021-10-25 21:26:23.510159|201345|
|pattern1 |2021-10-25 21:26:23.510335|2021-10-25 21:26:23.513177|201345|
+-----------+--------------------------+--------------------------+------+