有没有一种方法可以将两个时间戳上的数据集与偏移量连接起来,使其连接 time_1 和 time_2,其中 time_2 比 time_1 早 2 小时?

Is there a way to join two datasets on timestamp with an offset such that it connects time_1 with time_2 where time_2 is 2hrs earlier than time_1?

我正在尝试根据预定行程前 2 小时的天气预测延误。我有一个旅行数据集(调用 df1)和一个天气数据集(调用 df2)。为了预测延迟,我试图加入 df1 和 df2,偏移量为 2 小时。也就是想看预定出行数据前2小时的天气数据。数据的配对向下视图看起来像这样

示例 df1(行程数据):

travel_data location departure_time delayed
blah KPHX 2015-04-23T15:02:00.000+0000 1
bleh KRDU 2015-04-27T15:19:00.000+0000 0

示例 df2(天气数据):

location report_time weather_data
KPHX 2015-01-01 01:53:00 blih
KRDU 2015-01-01 09:53:00 bloh

我想首先在位置上加入数据,然后在时间戳数据上加入最少 2 小时的偏移量。如果有多个天气报告比出发时间早于2小时以上,我想加入最接近2小时偏移的旅行数据。

到目前为止我用过[​​=17=]

joinedDF = airlines_6m_recode.join(weather_filtered, (col("location") == col("location")) & (col("departure_time") == (col("report_date") + f.expr('INTERVAL 2 HOURS'))), "inner")

这仅适用于出发时间和(报告日期 - 2 小时)完全匹配的时间,因此我丢失了大部分数据。有没有办法加入 2 小时缓冲区之外的下一个最接近的报告日期?

我研究了 window 函数,但它们没有描述如何进行连接。

将连接条件更改为 >= 并在按位置分区后获得最大报告时间戳。

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1.Join as per conditions
# 2. Partition by location, order by report_ts desc, add row_number
# 3. Filter row_number == 1
joinedDF = airlines_6m_recode.join(
  weather_filtered,
  (airlines_6m_recode["location"] == weather_filtered["location"]) & (weather_filtered["report_time_ts"] <= airlines_6m_recode["departure_time_ts"] - F.expr("INTERVAL 2 HOURS"))
  , "inner")\
            .withColumn("row_number", F.row_number().over(Window.partitionBy(airlines_6m_recode['location'])\
            .orderBy(weather_filtered["report_time_ts"].desc())))

# Just to Print Intermediate result.
joinedDF.show()

joinedDF.filter('row_number == 1').show()