如何加入测量数据和发布信息?
How can I join measure data and release information?
我尝试连接两个 pyspark 数据帧。一个包含我的测量数据,另一个包含我的测量设备的发布信息。我想像这样将发布信息添加到测量数据中:
输入:
测量数据:
logger_id
measure_date
data
394
2018-07-09T09:25:40
some data
394
2018-08-23T09:51:18
other data
394
2019-04-23T09:51:18
other data
398
2018-01-10T12:15:53
more data
398
2019-10-24T08:10:25
other data
发布数据
logger_id
release_date
release_information
394
2018-07-01T00:00:00
release information
394
2019-04-01T00:00:00
release information
398
2018-01-01T00:00:00
release information
398
2019-07-01T00:00:00
release information
我想要这样的输出:
logger_id
measure_date
data
release_date
release_information
394
2018-07-09T09:25:40
some data
2018-07-01T00:00:00
release information
394
2018-08-23T09:51:18
other data
2018-07-01T00:00:00
release information
394
2019-04-23T09:51:18
other data
2019-04-01T00:00:00
release information
398
2018-01-10T12:15:53
more data
2018-01-01T00:00:00
release information
398
2019-10-24T08:10:25
other data
2019-07-01T00:00:00
release information
我已经试过了
cond = [release_data.release_date < measure_data.measure_date, release_data.logger_id == measure_data.logger_id]
measure_data.join(release_data, cond, how='fullouter')
但是在生成的数据框中,我得到了带有度量数据框'null'列的发布数据
我还考虑过遍历我的 measureddata 数据框并为每一行添加发布信息,但它真的很大,我不想那样做
您可以转换 release_df
以包含一个列,该列查找直到发布有效的时间,为此 lead
可以使用。
包含 release_valid_end
后,连接条件将更改为查找 measure_date
和 measure_date
之间的日期比较检查
release_date
和 release_valid_end
.
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window as W
measure_data = [(394, datetime.strptime("2018-07-09T09:25:40", "%Y-%m-%dT%H:%M:%S"), "some data",),
(394, datetime.strptime("2018-08-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
(394, datetime.strptime("2019-04-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
(398, datetime.strptime("2018-01-10T12:15:53", "%Y-%m-%dT%H:%M:%S"), "more data",),
(398, datetime.strptime("2019-10-24T08:10:25", "%Y-%m-%dT%H:%M:%S"), "other data",), ]
release_data = [(394, datetime.strptime("2018-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(394, datetime.strptime("2019-04-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(398, datetime.strptime("2018-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(398, datetime.strptime("2019-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",), ]
measure_df = spark.createDataFrame(measure_data, ("logger_id", "measure_date", "data",))
release_df = spark.createDataFrame(release_data, ("logger_id", "release_date", "release_information",))
world_end_date = datetime.strptime("2999-12-31T00:00:00", "%Y-%m-%dT%H:%M:%S")
window_spec = W.partitionBy("logger_id").orderBy(F.asc("release_date"))
release_validity_df = release_df.withColumn("release_valid_end",
F.lead("release_date", offset=1, default=world_end_date).over(window_spec))
(measure_df.join(release_validity_df,
((measure_df["logger_id"] == release_validity_df["logger_id"]) &
((measure_df["measure_date"] >= release_validity_df["release_date"]) &
(measure_df["measure_date"] < release_validity_df["release_valid_end"]))
))
).select(measure_df["logger_id"], "measure_date", "data", "release_date", "release_information").show()
输出
+---------+-------------------+----------+-------------------+-------------------+
|logger_id| measure_date| data| release_date|release_information|
+---------+-------------------+----------+-------------------+-------------------+
| 398|2018-01-10 12:15:53| more data|2018-01-01 00:00:00|release information|
| 398|2019-10-24 08:10:25|other data|2019-07-01 00:00:00|release information|
| 394|2018-07-09 09:25:40| some data|2018-07-01 00:00:00|release information|
| 394|2018-08-23 09:51:18|other data|2018-07-01 00:00:00|release information|
| 394|2019-04-23 09:51:18|other data|2019-04-01 00:00:00|release information|
+---------+-------------------+----------+-------------------+-------------------+
我尝试连接两个 pyspark 数据帧。一个包含我的测量数据,另一个包含我的测量设备的发布信息。我想像这样将发布信息添加到测量数据中:
输入:
测量数据:
logger_id measure_date data 394 2018-07-09T09:25:40 some data 394 2018-08-23T09:51:18 other data 394 2019-04-23T09:51:18 other data 398 2018-01-10T12:15:53 more data 398 2019-10-24T08:10:25 other data 发布数据
logger_id release_date release_information 394 2018-07-01T00:00:00 release information 394 2019-04-01T00:00:00 release information 398 2018-01-01T00:00:00 release information 398 2019-07-01T00:00:00 release information
我想要这样的输出:
logger_id | measure_date | data | release_date | release_information |
---|---|---|---|---|
394 | 2018-07-09T09:25:40 | some data | 2018-07-01T00:00:00 | release information |
394 | 2018-08-23T09:51:18 | other data | 2018-07-01T00:00:00 | release information |
394 | 2019-04-23T09:51:18 | other data | 2019-04-01T00:00:00 | release information |
398 | 2018-01-10T12:15:53 | more data | 2018-01-01T00:00:00 | release information |
398 | 2019-10-24T08:10:25 | other data | 2019-07-01T00:00:00 | release information |
我已经试过了
cond = [release_data.release_date < measure_data.measure_date, release_data.logger_id == measure_data.logger_id]
measure_data.join(release_data, cond, how='fullouter')
但是在生成的数据框中,我得到了带有度量数据框'null'列的发布数据
我还考虑过遍历我的 measureddata 数据框并为每一行添加发布信息,但它真的很大,我不想那样做
您可以转换 release_df
以包含一个列,该列查找直到发布有效的时间,为此 lead
可以使用。
包含 release_valid_end
后,连接条件将更改为查找 measure_date
和 measure_date
之间的日期比较检查
release_date
和 release_valid_end
.
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window as W
measure_data = [(394, datetime.strptime("2018-07-09T09:25:40", "%Y-%m-%dT%H:%M:%S"), "some data",),
(394, datetime.strptime("2018-08-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
(394, datetime.strptime("2019-04-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
(398, datetime.strptime("2018-01-10T12:15:53", "%Y-%m-%dT%H:%M:%S"), "more data",),
(398, datetime.strptime("2019-10-24T08:10:25", "%Y-%m-%dT%H:%M:%S"), "other data",), ]
release_data = [(394, datetime.strptime("2018-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(394, datetime.strptime("2019-04-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(398, datetime.strptime("2018-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(398, datetime.strptime("2019-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",), ]
measure_df = spark.createDataFrame(measure_data, ("logger_id", "measure_date", "data",))
release_df = spark.createDataFrame(release_data, ("logger_id", "release_date", "release_information",))
world_end_date = datetime.strptime("2999-12-31T00:00:00", "%Y-%m-%dT%H:%M:%S")
window_spec = W.partitionBy("logger_id").orderBy(F.asc("release_date"))
release_validity_df = release_df.withColumn("release_valid_end",
F.lead("release_date", offset=1, default=world_end_date).over(window_spec))
(measure_df.join(release_validity_df,
((measure_df["logger_id"] == release_validity_df["logger_id"]) &
((measure_df["measure_date"] >= release_validity_df["release_date"]) &
(measure_df["measure_date"] < release_validity_df["release_valid_end"]))
))
).select(measure_df["logger_id"], "measure_date", "data", "release_date", "release_information").show()
输出
+---------+-------------------+----------+-------------------+-------------------+
|logger_id| measure_date| data| release_date|release_information|
+---------+-------------------+----------+-------------------+-------------------+
| 398|2018-01-10 12:15:53| more data|2018-01-01 00:00:00|release information|
| 398|2019-10-24 08:10:25|other data|2019-07-01 00:00:00|release information|
| 394|2018-07-09 09:25:40| some data|2018-07-01 00:00:00|release information|
| 394|2018-08-23 09:51:18|other data|2018-07-01 00:00:00|release information|
| 394|2019-04-23 09:51:18|other data|2019-04-01 00:00:00|release information|
+---------+-------------------+----------+-------------------+-------------------+