如何加入测量数据和发布信息?

How can I join measure data and release information?

我尝试连接两个 pyspark 数据帧。一个包含我的测量数据,另一个包含我的测量设备的发布信息。我想像这样将发布信息添加到测量数据中:

输入:

我想要这样的输出:

logger_id measure_date data release_date release_information
394 2018-07-09T09:25:40 some data 2018-07-01T00:00:00 release information
394 2018-08-23T09:51:18 other data 2018-07-01T00:00:00 release information
394 2019-04-23T09:51:18 other data 2019-04-01T00:00:00 release information
398 2018-01-10T12:15:53 more data 2018-01-01T00:00:00 release information
398 2019-10-24T08:10:25 other data 2019-07-01T00:00:00 release information

我已经试过了

cond = [release_data.release_date < measure_data.measure_date, release_data.logger_id == measure_data.logger_id]
measure_data.join(release_data, cond, how='fullouter')

但是在生成的数据框中,我得到了带有度量数据框'null'列的发布数据

我还考虑过遍历我的 measureddata 数据框并为每一行添加发布信息,但它真的很大,我不想那样做

您可以转换 release_df 以包含一个列,该列查找直到发布有效的时间,为此 lead 可以使用。

包含 release_valid_end 后,连接条件将更改为查找 measure_datemeasure_date 之间的日期比较检查 release_daterelease_valid_end.

from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window as W

measure_data = [(394, datetime.strptime("2018-07-09T09:25:40", "%Y-%m-%dT%H:%M:%S"), "some data",),
                (394, datetime.strptime("2018-08-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
                (394, datetime.strptime("2019-04-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
                (398, datetime.strptime("2018-01-10T12:15:53", "%Y-%m-%dT%H:%M:%S"), "more data",),
                (398, datetime.strptime("2019-10-24T08:10:25", "%Y-%m-%dT%H:%M:%S"), "other data",), ]

release_data = [(394, datetime.strptime("2018-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
                (394, datetime.strptime("2019-04-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
                (398, datetime.strptime("2018-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
                (398, datetime.strptime("2019-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",), ]

measure_df = spark.createDataFrame(measure_data, ("logger_id", "measure_date", "data",))
release_df = spark.createDataFrame(release_data, ("logger_id", "release_date", "release_information",))

world_end_date = datetime.strptime("2999-12-31T00:00:00", "%Y-%m-%dT%H:%M:%S")

window_spec = W.partitionBy("logger_id").orderBy(F.asc("release_date"))

release_validity_df = release_df.withColumn("release_valid_end", 
                                            F.lead("release_date", offset=1, default=world_end_date).over(window_spec))

(measure_df.join(release_validity_df, 
                ((measure_df["logger_id"] == release_validity_df["logger_id"]) & 
                 ((measure_df["measure_date"] >= release_validity_df["release_date"]) &
                  (measure_df["measure_date"] < release_validity_df["release_valid_end"]))
                ))
).select(measure_df["logger_id"], "measure_date", "data", "release_date", "release_information").show()

输出

+---------+-------------------+----------+-------------------+-------------------+
|logger_id|       measure_date|      data|       release_date|release_information|
+---------+-------------------+----------+-------------------+-------------------+
|      398|2018-01-10 12:15:53| more data|2018-01-01 00:00:00|release information|
|      398|2019-10-24 08:10:25|other data|2019-07-01 00:00:00|release information|
|      394|2018-07-09 09:25:40| some data|2018-07-01 00:00:00|release information|
|      394|2018-08-23 09:51:18|other data|2018-07-01 00:00:00|release information|
|      394|2019-04-23 09:51:18|other data|2019-04-01 00:00:00|release information|
+---------+-------------------+----------+-------------------+-------------------+