检查 PySpark 列值是否存在于另一个数据框列值中
Check if PySaprk column values exists in another dataframe column values
我试图找出条件来检查一个 PySpark 数据帧的值是否存在于另一个 PySpark 数据帧中,如果存在,则提取该值并再次比较。我正在考虑用 when()
函数做多个 withColumn()
。
例如我的两个数据帧可以是这样的:
df1
| id | value |
| ----- | ---- |
| hello | 1111 |
| world | 2222 |
df2
| id | value |
| ------ | ---- |
| hello | 1111 |
| world | 3333 |
| people | 2222 |
而我希望获得的结果是首先检查 df1.id
的值是否存在于 df2.id
中,如果为真 return 我就是 df2.value
,例如我在尝试类似的东西:
df1 = df1.withColumn("df2_value", when(df1.id == df2.id, df2.value))
所以我得到类似的东西:
df1
| id | value | df2_value |
| ----- | ---- | --------- |
| hello | 1111 | 1111 |
| world | 2222 | 3333 |
所以现在我可以在 df1
数据框中的这两个值列和 return 中的布尔列(1
或 0
)之间进行另一次检查一个新的数据框。
我希望得到的结果是这样的:
df3
| id | value | df2_value | match |
| ----- | ---- | --------- | ----- |
| hello | 1111 | 1111 | 1 |
| world | 2222 | 3333 | 0 |
在 id
上将 df1
与 df2
左连接,然后在 id
之外的所有 df2 列前加上 df2_*
:
from pyspark.sql import functions as F
df1 = spark.createDataFrame([("hello", 1111), ("world", 2222)], ["id", "value"])
df2 = spark.createDataFrame([("hello", 1111), ("world", 3333), ("people", 2222)], ["id", "value"])
df = df1.join(
df2.select("id", *[F.col(c).alias(f"df2_{c}") for c in df2.columns if c != 'id']),
["id"],
"left"
)
然后使用 functools.reduce
你可以构造一个布尔表达式来检查列是否在 2 个数据框中匹配,如下所示:
from functools import reduce
check_expr = reduce(
lambda acc, x: acc & (F.col(x) == F.col(f"df2_{x}")),
[c for c in df1.columns if c != 'id'],
F.lit(True)
)
df.withColumn("match", check_expr.cast("int")).show()
#+-----+-----+---------+-----+
#| id|value|df2_value|match|
#+-----+-----+---------+-----+
#|hello| 1111| 1111| 1|
#|world| 2222| 3333| 0|
#+-----+-----+---------+-----+
我试图找出条件来检查一个 PySpark 数据帧的值是否存在于另一个 PySpark 数据帧中,如果存在,则提取该值并再次比较。我正在考虑用 when()
函数做多个 withColumn()
。
例如我的两个数据帧可以是这样的:
df1
| id | value |
| ----- | ---- |
| hello | 1111 |
| world | 2222 |
df2
| id | value |
| ------ | ---- |
| hello | 1111 |
| world | 3333 |
| people | 2222 |
而我希望获得的结果是首先检查 df1.id
的值是否存在于 df2.id
中,如果为真 return 我就是 df2.value
,例如我在尝试类似的东西:
df1 = df1.withColumn("df2_value", when(df1.id == df2.id, df2.value))
所以我得到类似的东西:
df1
| id | value | df2_value |
| ----- | ---- | --------- |
| hello | 1111 | 1111 |
| world | 2222 | 3333 |
所以现在我可以在 df1
数据框中的这两个值列和 return 中的布尔列(1
或 0
)之间进行另一次检查一个新的数据框。
我希望得到的结果是这样的:
df3
| id | value | df2_value | match |
| ----- | ---- | --------- | ----- |
| hello | 1111 | 1111 | 1 |
| world | 2222 | 3333 | 0 |
在 id
上将 df1
与 df2
左连接,然后在 id
之外的所有 df2 列前加上 df2_*
:
from pyspark.sql import functions as F
df1 = spark.createDataFrame([("hello", 1111), ("world", 2222)], ["id", "value"])
df2 = spark.createDataFrame([("hello", 1111), ("world", 3333), ("people", 2222)], ["id", "value"])
df = df1.join(
df2.select("id", *[F.col(c).alias(f"df2_{c}") for c in df2.columns if c != 'id']),
["id"],
"left"
)
然后使用 functools.reduce
你可以构造一个布尔表达式来检查列是否在 2 个数据框中匹配,如下所示:
from functools import reduce
check_expr = reduce(
lambda acc, x: acc & (F.col(x) == F.col(f"df2_{x}")),
[c for c in df1.columns if c != 'id'],
F.lit(True)
)
df.withColumn("match", check_expr.cast("int")).show()
#+-----+-----+---------+-----+
#| id|value|df2_value|match|
#+-----+-----+---------+-----+
#|hello| 1111| 1111| 1|
#|world| 2222| 3333| 0|
#+-----+-----+---------+-----+