检查 PySpark 列值是否存在于另一个数据框列值中

Check if PySaprk column values exists in another dataframe column values

我试图找出条件来检查一个 PySpark 数据帧的值是否存在于另一个 PySpark 数据帧中,如果存在,则提取该值并再次比较。我正在考虑用 when() 函数做多个 withColumn()

例如我的两个数据帧可以是这样的:

df1
| id    | value |
| ----- | ----  |
| hello | 1111  |
| world | 2222  |

df2
| id     | value |
| ------ | ----  |
| hello  | 1111  |
| world  | 3333  |
| people | 2222  |

而我希望获得的结果是首先检查 df1.id 的值是否存在于 df2.id 中,如果为真 return 我就是 df2.value,例如我在尝试类似的东西:

df1 = df1.withColumn("df2_value", when(df1.id == df2.id, df2.value))

所以我得到类似的东西:

df1
| id    | value | df2_value |
| ----- | ----  | --------- |
| hello | 1111  | 1111      |
| world | 2222  | 3333      |

所以现在我可以在 df1 数据框中的这两个值列和 return 中的布尔列(10)之间进行另一次检查一个新的数据框。

我希望得到的结果是这样的:

df3
| id    | value | df2_value | match |
| ----- | ----  | --------- | ----- |
| hello | 1111  | 1111      | 1     |
| world | 2222  | 3333      | 0     |

id 上将 df1df2 左连接,然后在 id 之外的所有 df2 列前加上 df2_*:

from pyspark.sql import functions as F

df1 = spark.createDataFrame([("hello", 1111), ("world", 2222)], ["id", "value"])
df2 = spark.createDataFrame([("hello", 1111), ("world", 3333), ("people", 2222)], ["id", "value"])

df = df1.join(
    df2.select("id", *[F.col(c).alias(f"df2_{c}") for c in df2.columns if c != 'id']),
    ["id"],
    "left"
)

然后使用 functools.reduce 你可以构造一个布尔表达式来检查列是否在 2 个数据框中匹配,如下所示:

from functools import reduce

check_expr = reduce(
    lambda acc, x: acc & (F.col(x) == F.col(f"df2_{x}")),
    [c for c in df1.columns if c != 'id'],
    F.lit(True)
)
    
df.withColumn("match", check_expr.cast("int")).show()
#+-----+-----+---------+-----+
#|   id|value|df2_value|match|
#+-----+-----+---------+-----+
#|hello| 1111|     1111|    1|
#|world| 2222|     3333|    0|
#+-----+-----+---------+-----+