加入 2 个数据框 pyspark
Joining 2 dataframes pyspark
我是 Pyspark 的新手。
我在 2 table 秒内有这样的数据,如下所示。我正在使用数据框。
表 1:
Id
Amount
Date
1
£100
01/04/2021
1
£50
08/04/2021
2
£60
02/04/2021
2
£20
06/05/2021
表 2:
Id
Status
Date
1
S1
01/04/2021
1
S2
05/04/2021
1
S3
10/04/2021
2
S1
02/04/2021
2
S2
10/04/2021
我需要加入上面的 2 个数据帧以产生如下所示的输出。
对于 table 1 中的每条记录,我们需要从 table 2 中获取截至 Date
有效的记录,反之亦然。例如,table1
在 08/04/2021
上有 Id=1
的 £50
,但 table 2 在 05/04/2021
上有 Id=1
的记录,其中 status更改为 S2
。因此,08/04/2021
的状态为 S2
。这就是我不确定如何在连接条件中给出以获得此输出的内容
实现此目标的有效方法是什么?
预期输出:
Id
Status
Date
Amount
1
S1
01/04/2021
£100
1
S2
05/04/2021
£100
1
S2
08/04/2021
£50
1
S3
10/04/2021
£50
2
S1
02/04/2021
£60
2
S2
10/04/2021
£60
2
S2
06/05/2021
£20
对 Id
和 Date
使用完全连接,然后使用 lag
window 函数从 Status
和 Amount
中获取值最接近 Date
行的先例:
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy("Id").orderBy(F.to_date("Date", "dd/MM/yyyy"))
joined_df = df1.join(df2, ["Id", "Date"], "full").withColumn(
"Status",
F.coalesce(F.col("Status"), F.lag("Status").over(w))
).withColumn(
"Amount",
F.coalesce(F.col("Amount"), F.lag("Amount").over(w))
)
joined_df.show()
#+---+----------+------+------+
#| Id| Date|Amount|Status|
#+---+----------+------+------+
#| 1|01/04/2021| £100| S1|
#| 1|05/04/2021| £100| S2|
#| 1|08/04/2021| £50| S2|
#| 1|10/04/2021| £50| S3|
#| 2|02/04/2021| £60| S1|
#| 2|10/04/2021| £60| S2|
#| 2|06/05/2021| £20| S2|
#+---+----------+------+------+
我是 Pyspark 的新手。 我在 2 table 秒内有这样的数据,如下所示。我正在使用数据框。
表 1:
Id | Amount | Date |
---|---|---|
1 | £100 | 01/04/2021 |
1 | £50 | 08/04/2021 |
2 | £60 | 02/04/2021 |
2 | £20 | 06/05/2021 |
表 2:
Id | Status | Date |
---|---|---|
1 | S1 | 01/04/2021 |
1 | S2 | 05/04/2021 |
1 | S3 | 10/04/2021 |
2 | S1 | 02/04/2021 |
2 | S2 | 10/04/2021 |
我需要加入上面的 2 个数据帧以产生如下所示的输出。
对于 table 1 中的每条记录,我们需要从 table 2 中获取截至 Date
有效的记录,反之亦然。例如,table1
在 08/04/2021
上有 Id=1
的 £50
,但 table 2 在 05/04/2021
上有 Id=1
的记录,其中 status更改为 S2
。因此,08/04/2021
的状态为 S2
。这就是我不确定如何在连接条件中给出以获得此输出的内容
实现此目标的有效方法是什么?
预期输出:
Id | Status | Date | Amount |
---|---|---|---|
1 | S1 | 01/04/2021 | £100 |
1 | S2 | 05/04/2021 | £100 |
1 | S2 | 08/04/2021 | £50 |
1 | S3 | 10/04/2021 | £50 |
2 | S1 | 02/04/2021 | £60 |
2 | S2 | 10/04/2021 | £60 |
2 | S2 | 06/05/2021 | £20 |
对 Id
和 Date
使用完全连接,然后使用 lag
window 函数从 Status
和 Amount
中获取值最接近 Date
行的先例:
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy("Id").orderBy(F.to_date("Date", "dd/MM/yyyy"))
joined_df = df1.join(df2, ["Id", "Date"], "full").withColumn(
"Status",
F.coalesce(F.col("Status"), F.lag("Status").over(w))
).withColumn(
"Amount",
F.coalesce(F.col("Amount"), F.lag("Amount").over(w))
)
joined_df.show()
#+---+----------+------+------+
#| Id| Date|Amount|Status|
#+---+----------+------+------+
#| 1|01/04/2021| £100| S1|
#| 1|05/04/2021| £100| S2|
#| 1|08/04/2021| £50| S2|
#| 1|10/04/2021| £50| S3|
#| 2|02/04/2021| £60| S1|
#| 2|10/04/2021| £60| S2|
#| 2|06/05/2021| £20| S2|
#+---+----------+------+------+