从大数据集中删除模糊重复项
Remove fuzzy duplicates from big dataset
我有一个包含多条记录的 CSV 文件。每条记录代表某个人。它有 first_name
、last_name
和其他详细信息。
我的目标是以巧妙的方式从此数据中删除重复项。记录来自不同的来源,副本可能包含不同的信息。
我在下面的示例中简化了问题:
df = spark.createDataFrame(
[
(1, "Anna", "Smyth", "01/03", "NY"),
(2, "Anna", "Smyth", "01/03", ""),
(3, "Anna", "Smyth", "01/03", "NY"),
(4, "Max", "Anderson", "12/04", "Boston"),
(5, "Max", "Anderson", "", "London"),
(6, "Max", "Anderson", "06/07", ""),
(7, "Sarah", "Nicolson", "02/09", ""),
(8, "Sarah", "Jonson", "", "Mexico"),
(9, "Sarah", "Jonson", "01/08", "Dallas"),
],
("id", "first_name", "last_name", "birthday", "city")
)
df.show()
+---+----------+---------+--------+------+
| id|first_name|last_name|birthday| city|
+---+----------+---------+--------+------+
| 1| Anna| Smyth| 01/03| NY|
| 2| Anna| Smyth| 01/03| |
| 3| Anna| Smyth| 01/03| NY|
| 4| Max| Anderson| 12/04|Boston|
| 5| Max| Anderson| |London|
| 6| Max| Anderson| 06/07| |
| 7| Sarah| Nicolson| 02/09| |
| 8| Sarah| Jonson| |Mexico|
| 9| Sarah| Jonson| 01/08|Dallas|
+---+----------+---------+--------+------+
我想按 first_name
和 last_name
对记录进行分组,然后进行一些比较以考虑记录是否重复。
如果很少有记录具有相同的 first_name
和 last_name
,我想检查 birthday
,如果它等于 - 它是重复的。
如果一个或记录有 birthday
填充,另一个 - 没有,它是重复的。如果两个(或多个)记录都为空 birthday
- 它是重复的。
相比之下,我忽略了 city
字段,但是在考虑重复项时,我想留下“最丰富”的记录,即填充了更多字段的记录。
如果记录具有相同的名称但不同的生日 - 它不是重复的。
比如上面,我想得到:
+---+----------+---------+--------+------+
| id|first_name|last_name|birthday| city|
+---+----------+---------+--------+------+
| 1| Anna| Smyth| 01/03| NY|
| 4| Max| Anderson| 12/04|Boston|
| 6| Max| Anderson| 06/07| |
| 7| Sarah| Nicolson| 02/09| |
| 9| Sarah| Jonson| 01/08|Dallas|
+---+----------+---------+--------+------+
在实际问题中,我有更多的字段 - 大约 70 个,其中一些应该是必须匹配的,有些 - 不是。我需要处理的记录数——大约 1 亿条。
我正在考虑使用 pyspark,但欢迎任何技术
首先,您可以通过连接列 first_name#last_name#birthday
为每一行创建组列 grp
,其中 birthday
被分区 [first_name
、[=16 中的最大值替换=] 如果它为 null 或空字符串:
from pyspark.sql import Window
from pyspark.sql import functions as F
w1 = Window.partitionBy("first_name", "last_name").orderBy()
df1 = df.withColumn(
"grp",
F.concat_ws(
"#",
"first_name",
"last_name",
F.coalesce(F.expr("nullif(birthday, '')"), F.max("birthday").over(w1))
)
).withColumn(
"rich_columns",
F.array(
*[F.col(c) for c in df.columns if c not in ["id", "first_name", "last_name"]]
)
)
df1.show(truncate=False)
#+---+----------+---------+--------+------+--------------------+---------------+
#|id |first_name|last_name|birthday|city |grp |rich_columns |
#+---+----------+---------+--------+------+--------------------+---------------+
#|1 |Anna |Smyth |01/03 |NY |Anna#Smyth#01/03 |[01/03, NY] |
#|2 |Anna |Smyth |01/03 | |Anna#Smyth#01/03 |[01/03, ] |
#|3 |Anna |Smyth |01/03 |NY |Anna#Smyth#01/03 |[01/03, NY] |
#|7 |Sarah |Nicolson |02/09 | |Sarah#Nicolson#02/09|[02/09, ] |
#|8 |Sarah |Jonson | |Mexico|Sarah#Jonson#01/08 |[, Mexico] |
#|9 |Sarah |Jonson |01/08 |Dallas|Sarah#Jonson#01/08 |[01/08, Dallas]|
#|4 |Max |Anderson |12/04 |Boston|Max#Anderson#12/04 |[12/04, Boston]|
#|5 |Max |Anderson | |London|Max#Anderson#12/04 |[, London] |
#|6 |Max |Anderson |06/07 | |Max#Anderson#06/07 |[06/07, ] |
#+---+----------+---------+--------+------+--------------------+---------------+
列 birthday
和 city
也用于创建数组列 rich_columns
,该列将用于确定最大信息不是 empty/null 的行的优先级。
然后,使用上面创建的组列执行行号并按 rich_columns
数组的大小排序:
w2 = Window.partitionBy("grp").orderBy(
F.expr("size(filter(rich_columns, x -> nullif(x, '') is not null))").desc()
)
df2 = df1.withColumn("rn", F.row_number().over(w2)) \
.filter("rn = 1") \
.drop("grp", "rn", "rich_columns")
df2.show()
#+---+----------+---------+--------+------+
#| id|first_name|last_name|birthday| city|
#+---+----------+---------+--------+------+
#| 7| Sarah| Nicolson| 02/09| |
#| 1| Anna| Smyth| 01/03| NY|
#| 9| Sarah| Jonson| 01/08|Dallas|
#| 6| Max| Anderson| 06/07| |
#| 4| Max| Anderson| 12/04|Boston|
#+---+----------+---------+--------+------+
在实际应用程序中,您应该在执行此操作之前将 last_name
、first_name
列转换为大写并去除重音符号(如果有)。
我有一个包含多条记录的 CSV 文件。每条记录代表某个人。它有 first_name
、last_name
和其他详细信息。
我的目标是以巧妙的方式从此数据中删除重复项。记录来自不同的来源,副本可能包含不同的信息。
我在下面的示例中简化了问题:
df = spark.createDataFrame(
[
(1, "Anna", "Smyth", "01/03", "NY"),
(2, "Anna", "Smyth", "01/03", ""),
(3, "Anna", "Smyth", "01/03", "NY"),
(4, "Max", "Anderson", "12/04", "Boston"),
(5, "Max", "Anderson", "", "London"),
(6, "Max", "Anderson", "06/07", ""),
(7, "Sarah", "Nicolson", "02/09", ""),
(8, "Sarah", "Jonson", "", "Mexico"),
(9, "Sarah", "Jonson", "01/08", "Dallas"),
],
("id", "first_name", "last_name", "birthday", "city")
)
df.show()
+---+----------+---------+--------+------+
| id|first_name|last_name|birthday| city|
+---+----------+---------+--------+------+
| 1| Anna| Smyth| 01/03| NY|
| 2| Anna| Smyth| 01/03| |
| 3| Anna| Smyth| 01/03| NY|
| 4| Max| Anderson| 12/04|Boston|
| 5| Max| Anderson| |London|
| 6| Max| Anderson| 06/07| |
| 7| Sarah| Nicolson| 02/09| |
| 8| Sarah| Jonson| |Mexico|
| 9| Sarah| Jonson| 01/08|Dallas|
+---+----------+---------+--------+------+
我想按 first_name
和 last_name
对记录进行分组,然后进行一些比较以考虑记录是否重复。
如果很少有记录具有相同的 first_name
和 last_name
,我想检查 birthday
,如果它等于 - 它是重复的。
如果一个或记录有 birthday
填充,另一个 - 没有,它是重复的。如果两个(或多个)记录都为空 birthday
- 它是重复的。
相比之下,我忽略了 city
字段,但是在考虑重复项时,我想留下“最丰富”的记录,即填充了更多字段的记录。
如果记录具有相同的名称但不同的生日 - 它不是重复的。
比如上面,我想得到:
+---+----------+---------+--------+------+
| id|first_name|last_name|birthday| city|
+---+----------+---------+--------+------+
| 1| Anna| Smyth| 01/03| NY|
| 4| Max| Anderson| 12/04|Boston|
| 6| Max| Anderson| 06/07| |
| 7| Sarah| Nicolson| 02/09| |
| 9| Sarah| Jonson| 01/08|Dallas|
+---+----------+---------+--------+------+
在实际问题中,我有更多的字段 - 大约 70 个,其中一些应该是必须匹配的,有些 - 不是。我需要处理的记录数——大约 1 亿条。 我正在考虑使用 pyspark,但欢迎任何技术
首先,您可以通过连接列 first_name#last_name#birthday
为每一行创建组列 grp
,其中 birthday
被分区 [first_name
、[=16 中的最大值替换=] 如果它为 null 或空字符串:
from pyspark.sql import Window
from pyspark.sql import functions as F
w1 = Window.partitionBy("first_name", "last_name").orderBy()
df1 = df.withColumn(
"grp",
F.concat_ws(
"#",
"first_name",
"last_name",
F.coalesce(F.expr("nullif(birthday, '')"), F.max("birthday").over(w1))
)
).withColumn(
"rich_columns",
F.array(
*[F.col(c) for c in df.columns if c not in ["id", "first_name", "last_name"]]
)
)
df1.show(truncate=False)
#+---+----------+---------+--------+------+--------------------+---------------+
#|id |first_name|last_name|birthday|city |grp |rich_columns |
#+---+----------+---------+--------+------+--------------------+---------------+
#|1 |Anna |Smyth |01/03 |NY |Anna#Smyth#01/03 |[01/03, NY] |
#|2 |Anna |Smyth |01/03 | |Anna#Smyth#01/03 |[01/03, ] |
#|3 |Anna |Smyth |01/03 |NY |Anna#Smyth#01/03 |[01/03, NY] |
#|7 |Sarah |Nicolson |02/09 | |Sarah#Nicolson#02/09|[02/09, ] |
#|8 |Sarah |Jonson | |Mexico|Sarah#Jonson#01/08 |[, Mexico] |
#|9 |Sarah |Jonson |01/08 |Dallas|Sarah#Jonson#01/08 |[01/08, Dallas]|
#|4 |Max |Anderson |12/04 |Boston|Max#Anderson#12/04 |[12/04, Boston]|
#|5 |Max |Anderson | |London|Max#Anderson#12/04 |[, London] |
#|6 |Max |Anderson |06/07 | |Max#Anderson#06/07 |[06/07, ] |
#+---+----------+---------+--------+------+--------------------+---------------+
列 birthday
和 city
也用于创建数组列 rich_columns
,该列将用于确定最大信息不是 empty/null 的行的优先级。
然后,使用上面创建的组列执行行号并按 rich_columns
数组的大小排序:
w2 = Window.partitionBy("grp").orderBy(
F.expr("size(filter(rich_columns, x -> nullif(x, '') is not null))").desc()
)
df2 = df1.withColumn("rn", F.row_number().over(w2)) \
.filter("rn = 1") \
.drop("grp", "rn", "rich_columns")
df2.show()
#+---+----------+---------+--------+------+
#| id|first_name|last_name|birthday| city|
#+---+----------+---------+--------+------+
#| 7| Sarah| Nicolson| 02/09| |
#| 1| Anna| Smyth| 01/03| NY|
#| 9| Sarah| Jonson| 01/08|Dallas|
#| 6| Max| Anderson| 06/07| |
#| 4| Max| Anderson| 12/04|Boston|
#+---+----------+---------+--------+------+
在实际应用程序中,您应该在执行此操作之前将 last_name
、first_name
列转换为大写并去除重音符号(如果有)。