使用 Soundex 函数或 Levenshtein 距离模糊匹配 pyspark 或 SQL 中的字符串
Fuzzy matching a string in in pyspark or SQL using Soundex function or Levenshtein distance
当护照和国家相同时,我不得不在最后一列应用 Levenshtein 函数。
matrix = passport_heck.select(\
f.col('name_id').alias('name_id_1'),
f.col('last').alias('last_1'),
f.col('country').alias('country_1'),
f.col('passport').alias('passport_1')) \
.crossJoin(passport_heck.select(\
f.col('name_id').alias('name_id_2'),
f.col('last').alias('last_2'),
f.col('country').alias('country_2'),
f.col('passport').alias('passport_2')))\
.filter((f.col('passport_1') == f.col('passport_2')) & (f.col('country_1') == f.col('country_2')))```
res = matrix.withColumn('distance', levenshtein(f.col('last_1'), f.col('last_2')))
现在我得到以下输出,这完全没问题。
现在我需要删除重复对(例如 ID 558635 与 1106562 然后 1106562 与 558635 比较相同的内容)。
任何人都可以在 pyspark 中给我一些逻辑以低于 table。
如果您想解决问题,您的问题可能会变得相当复杂,但这里有一些 pyspark 中的示例代码,希望可以帮助您入门。
首先是一个小数据集,
tinydata = sqlContext.createDataFrame(
[
(3527524, 'aamir', 'al malik', 'aamir.almalik@gmail.com'),
(4287983, 'aamir', 'al malik', 'aamir.almalik@company.com'),
(200490, 'aamir', 'al malik', 'aamir.almalik@gmail.come'),
(1906639, 'tahir', 'al malik', 'tahir.almalik@gmail.com')
],
['ID', 'first_NAME', 'last_NAME', 'EMAIL']
)
然后你通过cross-join
把它转换成一个差分矩阵。请注意,如果您有 500 万,这将变得很大。您需要尽可能避免比较,例如关注对您问题的一些评论,以及您可能想到的其他想法。注意最后的过滤器是为了避免比较 2 行两次。
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
之后,您可以计算距离。
def lev_dist(left, right):
return Levenshtein.distance(left, right)
lev_dist_udf = udf(lev_dist, IntegerType())
res = matrix.withColumn('d', lev_dist_udf(F.col('EMAIL1'), F.col('EMAIL2')))
通过这个小例子你得到
res.show()
+-------+--------------------+-------+--------------------+---+
| ID1| EMAIL1| ID2| EMAIL2| d|
+-------+--------------------+-------+--------------------+---+
|3527524|aamir.almalik@gma...| 200490|aamir.almalik@gma...| 1|
|3527524|aamir.almalik@gma...|1906639|tahir.almalik@gma...| 2|
|4287983|aamir.almalik@com...|3527524|aamir.almalik@gma...| 5|
|4287983|aamir.almalik@com...| 200490|aamir.almalik@gma...| 6|
|4287983|aamir.almalik@com...|1906639|tahir.almalik@gma...| 7|
|1906639|tahir.almalik@gma...| 200490|aamir.almalik@gma...| 3|
+-------+--------------------+-------+--------------------+---+
感谢您指出@cronoik
不需要udf,应该是这样的:
from pyspark.sql.functions import levenshtein
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
res = matrix.withColumn('d', levenshtein(F.col('EMAIL1'), F.col('EMAIL2')))
当护照和国家相同时,我不得不在最后一列应用 Levenshtein 函数。
matrix = passport_heck.select(\
f.col('name_id').alias('name_id_1'),
f.col('last').alias('last_1'),
f.col('country').alias('country_1'),
f.col('passport').alias('passport_1')) \
.crossJoin(passport_heck.select(\
f.col('name_id').alias('name_id_2'),
f.col('last').alias('last_2'),
f.col('country').alias('country_2'),
f.col('passport').alias('passport_2')))\
.filter((f.col('passport_1') == f.col('passport_2')) & (f.col('country_1') == f.col('country_2')))```
res = matrix.withColumn('distance', levenshtein(f.col('last_1'), f.col('last_2')))
现在我得到以下输出,这完全没问题。
现在我需要删除重复对(例如 ID 558635 与 1106562 然后 1106562 与 558635 比较相同的内容)。
任何人都可以在 pyspark 中给我一些逻辑以低于 table。
如果您想解决问题,您的问题可能会变得相当复杂,但这里有一些 pyspark 中的示例代码,希望可以帮助您入门。
首先是一个小数据集,
tinydata = sqlContext.createDataFrame(
[
(3527524, 'aamir', 'al malik', 'aamir.almalik@gmail.com'),
(4287983, 'aamir', 'al malik', 'aamir.almalik@company.com'),
(200490, 'aamir', 'al malik', 'aamir.almalik@gmail.come'),
(1906639, 'tahir', 'al malik', 'tahir.almalik@gmail.com')
],
['ID', 'first_NAME', 'last_NAME', 'EMAIL']
)
然后你通过cross-join
把它转换成一个差分矩阵。请注意,如果您有 500 万,这将变得很大。您需要尽可能避免比较,例如关注对您问题的一些评论,以及您可能想到的其他想法。注意最后的过滤器是为了避免比较 2 行两次。
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
之后,您可以计算距离。
def lev_dist(left, right):
return Levenshtein.distance(left, right)
lev_dist_udf = udf(lev_dist, IntegerType())
res = matrix.withColumn('d', lev_dist_udf(F.col('EMAIL1'), F.col('EMAIL2')))
通过这个小例子你得到
res.show()
+-------+--------------------+-------+--------------------+---+
| ID1| EMAIL1| ID2| EMAIL2| d|
+-------+--------------------+-------+--------------------+---+
|3527524|aamir.almalik@gma...| 200490|aamir.almalik@gma...| 1|
|3527524|aamir.almalik@gma...|1906639|tahir.almalik@gma...| 2|
|4287983|aamir.almalik@com...|3527524|aamir.almalik@gma...| 5|
|4287983|aamir.almalik@com...| 200490|aamir.almalik@gma...| 6|
|4287983|aamir.almalik@com...|1906639|tahir.almalik@gma...| 7|
|1906639|tahir.almalik@gma...| 200490|aamir.almalik@gma...| 3|
+-------+--------------------+-------+--------------------+---+
感谢您指出@cronoik
不需要udf,应该是这样的:
from pyspark.sql.functions import levenshtein
matrix = tinydata.select(F.col('ID').alias('ID1'), F.col('EMAIL').alias('EMAIL1')) \
.crossJoin(tinydata.select(F.col('ID').alias('ID2'), F.col('EMAIL').alias('EMAIL2'))) \
.filter(F.col('ID1') > F.col('ID2'))
res = matrix.withColumn('d', levenshtein(F.col('EMAIL1'), F.col('EMAIL2')))