如何在匹配时更改列值时在条件下加入 python 中的两个 pyspark 数据帧?
How to join two pyspark dataframes in python on a condition while changing column value on match?
我有两个这样的数据框:
df1 = spark.createDataFrame([(1, 11, 1999, 1999, None), (2, 22, 2000, 2000, 44), (3, 33, 2001, 2001,None)], ['id', 't', 'year','new_date','rev_t'])
df2 = spark.createDataFrame([(2, 44, 2022, 2022,None), (2, 55, 2001, 2001, 88)], ['id', 't', 'year','new_date','rev_t'])
df1.show()
df2.show()
+---+---+----+--------+-----+
| id| t|year|new_date|rev_t|
+---+---+----+--------+-----+
| 1| 11|1999| 1999| null|
| 2| 22|2000| 2000| 44|
| 3| 33|2001| 2001| null|
+---+---+----+--------+-----+
+---+---+----+--------+-----+
| id| t|year|new_date|rev_t|
+---+---+----+--------+-----+
| 2| 44|2022| 2022| null|
| 2| 55|2001| 2001| 88|
+---+---+----+--------+-----+
我想以一种方式加入他们,如果 df2.t == df1.rev_t
然后将结果数据框中的 new_date
更新为 df2.year
。
所以它应该是这样的:
+---+---+----+--------+-----+
| id| t|year|new_date|rev_t|
+---+---+----+--------+-----+
| 1| 11|1999| 1999| null|
| 2| 22|2000| 2022| 44|
| 2| 44|2022| 2022| null|
| 2| 55|2001| 2001| 88|
| 3| 33|2001| 2001| null|
+---+---+----+--------+-----+
要更新 df1
中 df2
的列,您可以对要更新的列使用左连接 + coalesce
函数,在本例中为 new_date
。
根据您的预期输出,您似乎还想添加来自 df2
的行,因此将连接结果与 df2
:
联合
from pyspark.sql import functions as F
result = (df1.join(df2.selectExpr("t as rev_t", "new_date as df2_new_date"), ["rev_t"], "left")
.withColumn("new_date", F.coalesce("df2_new_date", "new_date"))
.select(*df1.columns).union(df2)
)
result.show()
#+---+---+----+--------+-----+
#| id| t|year|new_date|rev_t|
#+---+---+----+--------+-----+
#| 1| 11|1999| 1999| null|
#| 3| 33|2001| 2001| null|
#| 2| 22|2000| 2022| 44|
#| 2| 44|2022| 2022| null|
#| 2| 55|2001| 2001| 88|
#+---+---+----+--------+-----+
我有两个这样的数据框:
df1 = spark.createDataFrame([(1, 11, 1999, 1999, None), (2, 22, 2000, 2000, 44), (3, 33, 2001, 2001,None)], ['id', 't', 'year','new_date','rev_t'])
df2 = spark.createDataFrame([(2, 44, 2022, 2022,None), (2, 55, 2001, 2001, 88)], ['id', 't', 'year','new_date','rev_t'])
df1.show()
df2.show()
+---+---+----+--------+-----+
| id| t|year|new_date|rev_t|
+---+---+----+--------+-----+
| 1| 11|1999| 1999| null|
| 2| 22|2000| 2000| 44|
| 3| 33|2001| 2001| null|
+---+---+----+--------+-----+
+---+---+----+--------+-----+
| id| t|year|new_date|rev_t|
+---+---+----+--------+-----+
| 2| 44|2022| 2022| null|
| 2| 55|2001| 2001| 88|
+---+---+----+--------+-----+
我想以一种方式加入他们,如果 df2.t == df1.rev_t
然后将结果数据框中的 new_date
更新为 df2.year
。
所以它应该是这样的:
+---+---+----+--------+-----+
| id| t|year|new_date|rev_t|
+---+---+----+--------+-----+
| 1| 11|1999| 1999| null|
| 2| 22|2000| 2022| 44|
| 2| 44|2022| 2022| null|
| 2| 55|2001| 2001| 88|
| 3| 33|2001| 2001| null|
+---+---+----+--------+-----+
要更新 df1
中 df2
的列,您可以对要更新的列使用左连接 + coalesce
函数,在本例中为 new_date
。
根据您的预期输出,您似乎还想添加来自 df2
的行,因此将连接结果与 df2
:
from pyspark.sql import functions as F
result = (df1.join(df2.selectExpr("t as rev_t", "new_date as df2_new_date"), ["rev_t"], "left")
.withColumn("new_date", F.coalesce("df2_new_date", "new_date"))
.select(*df1.columns).union(df2)
)
result.show()
#+---+---+----+--------+-----+
#| id| t|year|new_date|rev_t|
#+---+---+----+--------+-----+
#| 1| 11|1999| 1999| null|
#| 3| 33|2001| 2001| null|
#| 2| 22|2000| 2022| 44|
#| 2| 44|2022| 2022| null|
#| 2| 55|2001| 2001| 88|
#+---+---+----+--------+-----+