验证在另一个数据框中匹配的一个 pyspark 数据框中的行数据
Validate the row data in one pyspark Dataframe matched in another Dataframe
我有 2 个 Pyspark 数据帧 df1、df2。 df1 和 df2 都包含数百万条记录。
df1 就像:
+-------------------+--------+--------+
| name|state | pincode|
+-------------------+--------+--------+
| CYBEX INTERNATION| HOUSTON| 00530 |
| FLUID POWER| MEDWAY | 02053 |
| REFINERY SYSTEMS| FRANCE | 072234 |
| K N ENTERPRISES| MUMBAI | 100010 |
+-------------------+--------+--------+
df2 就像:
+--------------------+--------+--------+
| name |state | pincode|
+--------------------+--------+--------+
|FLUID POWER PVT LTD | MEDWAY | 02053 |
| CYBEX INTERNATION | HOUSTON| 02356 |
|REFINERY SYSTEMS LTD| MUMBAI | 072234 |
+--------------------+--------+--------+
所以,我想根据名称状态和 Pincode 检查 df1 是否在 df2 上找到,如果经过验证,输出应该是 1,否则 0,df 将是
+-------------------+--------+--------+--------- --+
| name|state | pincode| Validated |
+-------------------+--------+--------+---------- -+
| CYBEX INTERNATION| HOUSTON| 00530 | 0 |
| FLUID POWER| MEDWAY | 02053 | 1 |
| REFINERY SYSTEMS| FRANCE | 072234 | 0 |
| K N ENTERPRISES| MUMBAI | 100010 | 0 |
+-------------------+--------+--------+------------+
在第一种情况下,df1 Pincode 的第 1 行与任何 df2 Pincode 列都不匹配,因此验证 = 0
在 df1 Pincode 匹配的第 2 行的第二种情况下,状态也匹配并且对于名称列,我使用 Levenshtein 来匹配列名并且最后一行被验证 = 1
在第 3 行 Pincode 匹配但状态不匹配且已验证 = 0
在 4rth Pincode 中不存在并且验证 = 0
我在嵌套 if 中迭代数据时尝试使用 Pandas dataFrame,但是数据太大,迭代不是一个好的选择。
我希望使用 pyspark 和利用并行处理来加快进程,例如:
df_final = df1.withColumn('validated', if some_expression == True THEN 1,ELSE 0)
但无法弄清楚 some_expression,以及如何检查整个 df1 在具有给定列且没有任何迭代的情况下在另一个 df2 上验证。
我遇到过不同的火花问题和类似的问题,但 none 对我有帮助。
任何帮助将不胜感激。如有不明之处请评论。
将 levenshtein-distance
与左连接结合使用,您可以执行如下操作:
join_condition = (col("df1.pincode") == col("df2.pincode")) \
& (levenshtein(col("df1.name"), col("df2.name")) <= 10) \
& (col("df1.state") == col("df2.state"))
result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left")
result_df.select("df1.*",
when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated")
).show()
#+-----------------+-------+-------+---------+
#| name| state|pincode|validated|
#+-----------------+-------+-------+---------+
#|CYBEX INTERNATION|HOUSTON| 00530| 0|
#| FLUID POWER| MEDWAY| 02053| 1|
#| REFINERY SYSTEMS| FRANCE| 072234| 0|
#| K N ENTERPRISES| MUMBAI| 100010| 0|
#+-----------------+-------+-------+---------+
我有 2 个 Pyspark 数据帧 df1、df2。 df1 和 df2 都包含数百万条记录。
df1 就像:
+-------------------+--------+--------+
| name|state | pincode|
+-------------------+--------+--------+
| CYBEX INTERNATION| HOUSTON| 00530 |
| FLUID POWER| MEDWAY | 02053 |
| REFINERY SYSTEMS| FRANCE | 072234 |
| K N ENTERPRISES| MUMBAI | 100010 |
+-------------------+--------+--------+
df2 就像:
+--------------------+--------+--------+
| name |state | pincode|
+--------------------+--------+--------+
|FLUID POWER PVT LTD | MEDWAY | 02053 |
| CYBEX INTERNATION | HOUSTON| 02356 |
|REFINERY SYSTEMS LTD| MUMBAI | 072234 |
+--------------------+--------+--------+
所以,我想根据名称状态和 Pincode 检查 df1 是否在 df2 上找到,如果经过验证,输出应该是 1,否则 0,df 将是
+-------------------+--------+--------+--------- --+
| name|state | pincode| Validated |
+-------------------+--------+--------+---------- -+
| CYBEX INTERNATION| HOUSTON| 00530 | 0 |
| FLUID POWER| MEDWAY | 02053 | 1 |
| REFINERY SYSTEMS| FRANCE | 072234 | 0 |
| K N ENTERPRISES| MUMBAI | 100010 | 0 |
+-------------------+--------+--------+------------+
在第一种情况下,df1 Pincode 的第 1 行与任何 df2 Pincode 列都不匹配,因此验证 = 0
在 df1 Pincode 匹配的第 2 行的第二种情况下,状态也匹配并且对于名称列,我使用 Levenshtein 来匹配列名并且最后一行被验证 = 1
在第 3 行 Pincode 匹配但状态不匹配且已验证 = 0
在 4rth Pincode 中不存在并且验证 = 0
我在嵌套 if 中迭代数据时尝试使用 Pandas dataFrame,但是数据太大,迭代不是一个好的选择。
我希望使用 pyspark 和利用并行处理来加快进程,例如:
df_final = df1.withColumn('validated', if some_expression == True THEN 1,ELSE 0)
但无法弄清楚 some_expression,以及如何检查整个 df1 在具有给定列且没有任何迭代的情况下在另一个 df2 上验证。
我遇到过不同的火花问题和类似的问题,但 none 对我有帮助。 任何帮助将不胜感激。如有不明之处请评论。
将 levenshtein-distance
与左连接结合使用,您可以执行如下操作:
join_condition = (col("df1.pincode") == col("df2.pincode")) \
& (levenshtein(col("df1.name"), col("df2.name")) <= 10) \
& (col("df1.state") == col("df2.state"))
result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left")
result_df.select("df1.*",
when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated")
).show()
#+-----------------+-------+-------+---------+
#| name| state|pincode|validated|
#+-----------------+-------+-------+---------+
#|CYBEX INTERNATION|HOUSTON| 00530| 0|
#| FLUID POWER| MEDWAY| 02053| 1|
#| REFINERY SYSTEMS| FRANCE| 072234| 0|
#| K N ENTERPRISES| MUMBAI| 100010| 0|
#+-----------------+-------+-------+---------+