有没有一种方法可以将两个时间戳上的数据集与偏移量连接起来,使其连接 time_1 和 time_2,其中 time_2 比 time_1 早 2 小时?
Is there a way to join two datasets on timestamp with an offset such that it connects time_1 with time_2 where time_2 is 2hrs earlier than time_1?
我正在尝试根据预定行程前 2 小时的天气预测延误。我有一个旅行数据集(调用 df1)和一个天气数据集(调用 df2)。为了预测延迟,我试图加入 df1 和 df2,偏移量为 2 小时。也就是想看预定出行数据前2小时的天气数据。数据的配对向下视图看起来像这样
示例 df1(行程数据):
travel_data
location
departure_time
delayed
blah
KPHX
2015-04-23T15:02:00.000+0000
1
bleh
KRDU
2015-04-27T15:19:00.000+0000
0
示例 df2(天气数据):
location
report_time
weather_data
KPHX
2015-01-01 01:53:00
blih
KRDU
2015-01-01 09:53:00
bloh
我想首先在位置上加入数据,然后在时间戳数据上加入最少 2 小时的偏移量。如果有多个天气报告比出发时间早于2小时以上,我想加入最接近2小时偏移的旅行数据。
到目前为止我用过[=17=]
joinedDF = airlines_6m_recode.join(weather_filtered, (col("location") == col("location")) & (col("departure_time") == (col("report_date") + f.expr('INTERVAL 2 HOURS'))), "inner")
这仅适用于出发时间和(报告日期 - 2 小时)完全匹配的时间,因此我丢失了大部分数据。有没有办法加入 2 小时缓冲区之外的下一个最接近的报告日期?
我研究了 window 函数,但它们没有描述如何进行连接。
将连接条件更改为 >= 并在按位置分区后获得最大报告时间戳。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 1.Join as per conditions
# 2. Partition by location, order by report_ts desc, add row_number
# 3. Filter row_number == 1
joinedDF = airlines_6m_recode.join(
weather_filtered,
(airlines_6m_recode["location"] == weather_filtered["location"]) & (weather_filtered["report_time_ts"] <= airlines_6m_recode["departure_time_ts"] - F.expr("INTERVAL 2 HOURS"))
, "inner")\
.withColumn("row_number", F.row_number().over(Window.partitionBy(airlines_6m_recode['location'])\
.orderBy(weather_filtered["report_time_ts"].desc())))
# Just to Print Intermediate result.
joinedDF.show()
joinedDF.filter('row_number == 1').show()
我正在尝试根据预定行程前 2 小时的天气预测延误。我有一个旅行数据集(调用 df1)和一个天气数据集(调用 df2)。为了预测延迟,我试图加入 df1 和 df2,偏移量为 2 小时。也就是想看预定出行数据前2小时的天气数据。数据的配对向下视图看起来像这样
示例 df1(行程数据):
travel_data | location | departure_time | delayed |
---|---|---|---|
blah | KPHX | 2015-04-23T15:02:00.000+0000 | 1 |
bleh | KRDU | 2015-04-27T15:19:00.000+0000 | 0 |
示例 df2(天气数据):
location | report_time | weather_data |
---|---|---|
KPHX | 2015-01-01 01:53:00 | blih |
KRDU | 2015-01-01 09:53:00 | bloh |
我想首先在位置上加入数据,然后在时间戳数据上加入最少 2 小时的偏移量。如果有多个天气报告比出发时间早于2小时以上,我想加入最接近2小时偏移的旅行数据。
到目前为止我用过[=17=]
joinedDF = airlines_6m_recode.join(weather_filtered, (col("location") == col("location")) & (col("departure_time") == (col("report_date") + f.expr('INTERVAL 2 HOURS'))), "inner")
这仅适用于出发时间和(报告日期 - 2 小时)完全匹配的时间,因此我丢失了大部分数据。有没有办法加入 2 小时缓冲区之外的下一个最接近的报告日期?
我研究了 window 函数,但它们没有描述如何进行连接。
将连接条件更改为 >= 并在按位置分区后获得最大报告时间戳。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 1.Join as per conditions
# 2. Partition by location, order by report_ts desc, add row_number
# 3. Filter row_number == 1
joinedDF = airlines_6m_recode.join(
weather_filtered,
(airlines_6m_recode["location"] == weather_filtered["location"]) & (weather_filtered["report_time_ts"] <= airlines_6m_recode["departure_time_ts"] - F.expr("INTERVAL 2 HOURS"))
, "inner")\
.withColumn("row_number", F.row_number().over(Window.partitionBy(airlines_6m_recode['location'])\
.orderBy(weather_filtered["report_time_ts"].desc())))
# Just to Print Intermediate result.
joinedDF.show()
joinedDF.filter('row_number == 1').show()