验证在另一个数据框中匹配的一个 pyspark 数据框中的行数据

Validate the row data in one pyspark Dataframe matched in another Dataframe

我有 2 个 Pyspark 数据帧 df1、df2。 df1 和 df2 都包含数百万条记录。

df1 就像:

+-------------------+--------+--------+
|               name|state   | pincode|
+-------------------+--------+--------+
|  CYBEX INTERNATION| HOUSTON| 00530  |
|        FLUID POWER| MEDWAY | 02053  |
|   REFINERY SYSTEMS| FRANCE | 072234 |
|    K N ENTERPRISES| MUMBAI | 100010 |
+-------------------+--------+--------+

df2 就像:

+--------------------+--------+--------+
|               name |state   | pincode|
+--------------------+--------+--------+
|FLUID POWER PVT LTD | MEDWAY | 02053  |
|  CYBEX INTERNATION | HOUSTON| 02356  |
|REFINERY SYSTEMS LTD| MUMBAI | 072234 |
+--------------------+--------+--------+

所以,我想根据名称状态和 Pincode 检查 df1 是否在 df2 上找到,如果经过验证,输出应该是 1,否则 0,df 将是

+-------------------+--------+--------+--------- --+
|               name|state   | pincode|  Validated |
+-------------------+--------+--------+---------- -+
|  CYBEX INTERNATION| HOUSTON| 00530  |     0      |
|        FLUID POWER| MEDWAY | 02053  |     1      |
|   REFINERY SYSTEMS| FRANCE | 072234 |     0      |
|    K N ENTERPRISES| MUMBAI | 100010 |     0      |
+-------------------+--------+--------+------------+

在第一种情况下,df1 Pincode 的第 1 行与任何 df2 Pincode 列都不匹配,因此验证 = 0
在 df1 Pincode 匹配的第 2 行的第二种情况下,状态也匹配并且对于名称列,我使用 Levenshtein 来匹配列名并且最后一行被验证 = 1
在第 3 行 Pincode 匹配但状态不匹配且已验证 = 0
在 4rth Pincode 中不存在并且验证 = 0

我在嵌套 if 中迭代数据时尝试使用 Pandas dataFrame,但是数据太大,迭代不是一个好的选择。

我希望使用 pyspark 和利用并行处理来加快进程,例如:

df_final = df1.withColumn('validated', if some_expression == True THEN 1,ELSE 0)

但无法弄清楚 some_expression,以及如何检查整个 df1 在具有给定列且没有任何迭代的情况下在另一个 df2 上验证。

我遇到过不同的火花问题和类似的问题,但 none 对我有帮助。 任何帮助将不胜感激。如有不明之处请评论。

levenshtein-distance 与左连接结合使用,您可以执行如下操作:

join_condition = (col("df1.pincode") == col("df2.pincode")) \
                 & (levenshtein(col("df1.name"), col("df2.name")) <= 10) \
                 & (col("df1.state") == col("df2.state"))

result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left")

result_df.select("df1.*",
              when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated")
              ).show()

#+-----------------+-------+-------+---------+
#|             name|  state|pincode|validated|
#+-----------------+-------+-------+---------+
#|CYBEX INTERNATION|HOUSTON|  00530|        0|
#|      FLUID POWER| MEDWAY|  02053|        1|
#| REFINERY SYSTEMS| FRANCE| 072234|        0|
#|  K N ENTERPRISES| MUMBAI| 100010|        0|
#+-----------------+-------+-------+---------+