Pyspark:匹配来自两个不同数据框的列并增加价值
Pyspark: match columns from two different dataframes and add value
我正在尝试比较不同数据框中存在的两列的值,以根据条件的匹配创建新的数据框:
df1=
| id |
| -- |
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
df2=
| id |
| -- |
| 2 |
| 5 |
| 1 |
所以,我想在df1的字段中存在df2的字段时,在is_used字段中添加一个'x',否则添加'NA',以生成结果数据帧像这样:
df3 =
| id | is_used |
| -- | ------- |
| 1 | X |
| 2 | X |
| 3 | NA |
| 4 | NA |
| 5 | X |
我试过这种方式,但选择标准在所有列中都放置了一个“X”:
df3 = df3.withColumn('is_used', F.when(
condition = (F.arrays_overlap(F.array(df1.id), F.array(df2.id))) == False,
value = 'NA'
).otherwise('X'))
如有任何帮助,我将不胜感激
试试下面的代码,它会给你类似的结果,你可以进行其余的更改:
df3 = df1.alias("df1").\
join(df2.alias("df2"), (df1.id==df2.id), how='left').\
withColumn('is_true', F.when(df1.id == df2.id,F.lit("X")).otherwise(F.lit("NA"))).\
select("df1.*","is_true")
df3.show()
尝试 fullouter
加入:
df3 = (
df1.join(df2.alias("df2"), df1.id == df2.id, "fullouter")
.withColumn(
"is_used",
F.when(F.col("df2.id").isNotNull(), F.lit("X")).otherwise(F.lit("NA")),
)
.drop(F.col("df2.id"))
.orderBy(F.col("id"))
)
结果:
+---+-------+
|id |is_used|
+---+-------+
|1 |X |
|2 |X |
|3 |NA |
|4 |NA |
|5 |X |
+---+-------+
首先,我要感谢贡献代码的人,这对理解正在发生的事情非常有用。
问题在于,当尝试执行 df1.id == df2.id 时,Spark 将两列推断为一个列,因为它们具有相同的名称,因此所有迭代的结果始终为 True .
只需重命名我想要比较的字段,它对我来说完全有效。
代码如下:
df2 = df2.withColumnRenamed("id", "id1")
df3 = df1.alias("df1").join(df2.alias("df2"),
(df1.id == df2.id1), "left")
df3 = df3.withColumn("is_used", F.when(df1.id == df2.id1),
"X").otherwise("NA")
df3 = df3.drop("id1")
我正在尝试比较不同数据框中存在的两列的值,以根据条件的匹配创建新的数据框:
df1=
| id |
| -- |
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
df2=
| id |
| -- |
| 2 |
| 5 |
| 1 |
所以,我想在df1的字段中存在df2的字段时,在is_used字段中添加一个'x',否则添加'NA',以生成结果数据帧像这样:
df3 =
| id | is_used |
| -- | ------- |
| 1 | X |
| 2 | X |
| 3 | NA |
| 4 | NA |
| 5 | X |
我试过这种方式,但选择标准在所有列中都放置了一个“X”:
df3 = df3.withColumn('is_used', F.when(
condition = (F.arrays_overlap(F.array(df1.id), F.array(df2.id))) == False,
value = 'NA'
).otherwise('X'))
如有任何帮助,我将不胜感激
试试下面的代码,它会给你类似的结果,你可以进行其余的更改:
df3 = df1.alias("df1").\
join(df2.alias("df2"), (df1.id==df2.id), how='left').\
withColumn('is_true', F.when(df1.id == df2.id,F.lit("X")).otherwise(F.lit("NA"))).\
select("df1.*","is_true")
df3.show()
尝试 fullouter
加入:
df3 = (
df1.join(df2.alias("df2"), df1.id == df2.id, "fullouter")
.withColumn(
"is_used",
F.when(F.col("df2.id").isNotNull(), F.lit("X")).otherwise(F.lit("NA")),
)
.drop(F.col("df2.id"))
.orderBy(F.col("id"))
)
结果:
+---+-------+
|id |is_used|
+---+-------+
|1 |X |
|2 |X |
|3 |NA |
|4 |NA |
|5 |X |
+---+-------+
首先,我要感谢贡献代码的人,这对理解正在发生的事情非常有用。
问题在于,当尝试执行 df1.id == df2.id 时,Spark 将两列推断为一个列,因为它们具有相同的名称,因此所有迭代的结果始终为 True .
只需重命名我想要比较的字段,它对我来说完全有效。
代码如下:
df2 = df2.withColumnRenamed("id", "id1")
df3 = df1.alias("df1").join(df2.alias("df2"),
(df1.id == df2.id1), "left")
df3 = df3.withColumn("is_used", F.when(df1.id == df2.id1),
"X").otherwise("NA")
df3 = df3.drop("id1")