连接两个时间序列数据帧以获得 PySpark 中每个左侧条目的最新右侧条目
Join two timeseries dataframes to get the most recent right entry for each left entry in PySpark
我有两个 Sparks 数据帧:
df1
每个 id
和 date
:
一个条目
|date |id |
+-----------+-----+
|2021-11-15 | 1|
|2021-11-14 | 1|
|2021-11-15 | 2|
|2021-11-14 | 2|
|2021-11-15 | 3|
|2021-11-14 | 3|
df2
有多个日志条目:
|date |id |
+-----------+-----+
|2021-11-13 | 1|
|2021-11-13 | 1|
|2021-11-13 | 3|
|2021-11-14 | 1|
|2021-11-14 | 1|
|2021-11-14 | 1|
|2021-11-14 | 1|
|2021-11-15 | 1|
|2021-11-15 | 1|
如何加入这些 dfs,以便根据 id
和 date
df2
获得最新的可能条目(日期(df2)应该 <= 日期) ?
|date |id | date(df2)|
+-----------+------+------------+
|2021-11-15 | 1 | 2021-11-15 |
|2021-11-14 | 1 | 2021-11-14 |
|2021-11-15 | 2 | null |
|2021-11-14 | 2 | null |
|2021-11-15 | 3 | 2021-11-13 |
|2021-11-14 | 3 | 2021-11-13 |
谢谢
进入数字
使用连接然后按 df1.id
和 df2.date
分组并使用条件聚合获得最大 df2.date <= df1.date
import pyspark.sql.functions as F
result_df = df1.join(
df2.withColumnRenamed("date", "df2_date"),
["id"],
"left"
).groupBy("id", "date").agg(
F.max(
F.when(F.col("df2_date") <= F.col("date"), F.col("df2_date"))
).alias("df2_date")
)
result_df.show()
#+---+----------+----------+
#| id| date| df2_date|
#+---+----------+----------+
#| 1|2021-11-14|2021-11-14|
#| 1|2021-11-15|2021-11-15|
#| 2|2021-11-14| null|
#| 2|2021-11-15| null|
#| 3|2021-11-14|2021-11-13|
#| 3|2021-11-15|2021-11-13|
#+---+----------+----------+
我有两个 Sparks 数据帧:
df1
每个 id
和 date
:
|date |id |
+-----------+-----+
|2021-11-15 | 1|
|2021-11-14 | 1|
|2021-11-15 | 2|
|2021-11-14 | 2|
|2021-11-15 | 3|
|2021-11-14 | 3|
df2
有多个日志条目:
|date |id |
+-----------+-----+
|2021-11-13 | 1|
|2021-11-13 | 1|
|2021-11-13 | 3|
|2021-11-14 | 1|
|2021-11-14 | 1|
|2021-11-14 | 1|
|2021-11-14 | 1|
|2021-11-15 | 1|
|2021-11-15 | 1|
如何加入这些 dfs,以便根据 id
和 date
df2
获得最新的可能条目(日期(df2)应该 <= 日期) ?
|date |id | date(df2)|
+-----------+------+------------+
|2021-11-15 | 1 | 2021-11-15 |
|2021-11-14 | 1 | 2021-11-14 |
|2021-11-15 | 2 | null |
|2021-11-14 | 2 | null |
|2021-11-15 | 3 | 2021-11-13 |
|2021-11-14 | 3 | 2021-11-13 |
谢谢 进入数字
使用连接然后按 df1.id
和 df2.date
分组并使用条件聚合获得最大 df2.date <= df1.date
import pyspark.sql.functions as F
result_df = df1.join(
df2.withColumnRenamed("date", "df2_date"),
["id"],
"left"
).groupBy("id", "date").agg(
F.max(
F.when(F.col("df2_date") <= F.col("date"), F.col("df2_date"))
).alias("df2_date")
)
result_df.show()
#+---+----------+----------+
#| id| date| df2_date|
#+---+----------+----------+
#| 1|2021-11-14|2021-11-14|
#| 1|2021-11-15|2021-11-15|
#| 2|2021-11-14| null|
#| 2|2021-11-15| null|
#| 3|2021-11-14|2021-11-13|
#| 3|2021-11-15|2021-11-13|
#+---+----------+----------+