在另一列上查找最近的时间戳并在新列中添加值 PySpark

Find nearest Timestamp on another column and add value in a new column PySpark

我有一个包含两个 TimeStamp 列的合并 DataFrame。我想找到最近的(前向)时间戳(Timestamp1 -> Timestamp2)并获取关联的值并将其添加到新列中。

TimeStamp1                Value1     TimeStamp2               Value2
2021-11-01T01:55:29.473   131        2021-11-01T01:55:28.205  A
2021-11-01T01:55:30.474   3          2021-11-01T01:55:31.205  B
2021-11-01T05:01:55.247   195        2021-11-01T03:44:14.208  C
2021-11-01T05:01:56.247   67         2021-11-01T05:41:56.205  D
2021-11-01T09:41:30.264   131        2021-11-01T09:41:29.405  E
2021-11-01T09:41:32.264   67         2021-11-01T09:41:35.205  F

预期输出:

TimeStamp1                Value1     Value 2 
2021-11-01T01:55:29.473   131        B
2021-11-01T01:55:30.474   3          B
2021-11-01T05:01:55.247   195        D
2021-11-01T05:01:56.247   67         D
2021-11-01T09:41:30.264   131        F
2021-11-01T09:41:32.264   67         F

我正在使用 PySpark,我检查了一些方法,但是在 pandas。

您要查找的转换可以分两步实现:

  1. 生成所有可能的组合,其中 df["TimeStamp2"] >= df[TimeStamp1"] 使用 。这形成了我们的 candidate_df.
  2. 我们通过在包含 TimeStamp1 的行中查找包含最小 TimeStamp2 的行来修剪 candidate_df 以检索预期的行。我们通过 TimeStamp1candidate_df 进行分区,然后按 TimeStamp2 升序排序并返回第一行。

If you have threshold for the "maximum nearness" (i.e) maximum difference between TimeStamp1 and nearest TimeStamp2, then the solution can be optimized to reduce size of candidate_df.

工作示例


from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window

data = [(datetime.strptime("2021-11-01T01:55:29.473", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T01:55:28.205", "%Y-%m-%dT%H:%M:%S.%f"), "A"),
(datetime.strptime("2021-11-01T01:55:30.474", "%Y-%m-%dT%H:%M:%S.%f"), 3,   datetime.strptime("2021-11-01T01:55:31.205", "%Y-%m-%dT%H:%M:%S.%f"), "B"),
(datetime.strptime("2021-11-01T05:01:55.247", "%Y-%m-%dT%H:%M:%S.%f"), 195, datetime.strptime("2021-11-01T03:44:14.208", "%Y-%m-%dT%H:%M:%S.%f"), "C"),
(datetime.strptime("2021-11-01T05:01:56.247", "%Y-%m-%dT%H:%M:%S.%f"), 67,  datetime.strptime("2021-11-01T05:41:56.205", "%Y-%m-%dT%H:%M:%S.%f"), "D"),
(datetime.strptime("2021-11-01T09:41:30.264", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T09:41:29.405", "%Y-%m-%dT%H:%M:%S.%f"), "E"),
(datetime.strptime("2021-11-01T09:41:32.264", "%Y-%m-%dT%H:%M:%S.%f"), 67,  datetime.strptime("2021-11-01T09:41:35.205", "%Y-%m-%dT%H:%M:%S.%f"), "F"),]

df = spark.createDataFrame(data, ("TimeStamp1", "Value1", "TimeStamp2", "Value2",))

candidate_df = df.alias("l").join(df.alias("r"), F.col("r.TimeStamp2") >= F.col("l.TimeStamp1"))\
                 .selectExpr("l.TimeStamp1 as TimeStamp1", 
                             "l.Value1 as Value1", 
                             "r.TimeStamp2 as TimeStamp2", 
                             "r.Value2 as Value2")

window_spec = Window.partitionBy("TimeStamp1").orderBy("TimeStamp2")

candidate_df.withColumn("rn" ,F.row_number().over(window_spec))\
            .filter(F.col("rn") == 1)\
            .drop("rn", "TimeStamp2")\
            .show(200, False)

输出

+-----------------------+------+------+
|TimeStamp1             |Value1|Value2|
+-----------------------+------+------+
|2021-11-01 01:55:29.473|131   |B     |
|2021-11-01 01:55:30.474|3     |B     |
|2021-11-01 05:01:55.247|195   |D     |
|2021-11-01 05:01:56.247|67    |D     |
|2021-11-01 09:41:30.264|131   |F     |
|2021-11-01 09:41:32.264|67    |F     |
+-----------------------+------+------+