从大数据集中删除模糊重复项

Remove fuzzy duplicates from big dataset

我有一个包含多条记录的 CSV 文件。每条记录代表某个人。它有 first_namelast_name 和其他详细信息。 我的目标是以巧妙的方式从此数据中删除重复项。记录来自不同的来源,副本可能包含不同的信息。 我在下面的示例中简化了问题:

df = spark.createDataFrame(
    [
        (1, "Anna", "Smyth", "01/03", "NY"), 
        (2, "Anna", "Smyth", "01/03", ""), 
        (3, "Anna", "Smyth", "01/03", "NY"),         
        (4, "Max", "Anderson", "12/04", "Boston"), 
        (5, "Max", "Anderson", "", "London"), 
        (6, "Max", "Anderson", "06/07", ""),         
        (7, "Sarah", "Nicolson", "02/09", ""), 
        (8, "Sarah", "Jonson", "", "Mexico"), 
        (9, "Sarah", "Jonson", "01/08", "Dallas"), 
    ],
    ("id", "first_name", "last_name", "birthday", "city")
)
df.show()

+---+----------+---------+--------+------+
| id|first_name|last_name|birthday|  city|
+---+----------+---------+--------+------+
|  1|      Anna|    Smyth|   01/03|    NY|
|  2|      Anna|    Smyth|   01/03|      |
|  3|      Anna|    Smyth|   01/03|    NY|
|  4|       Max| Anderson|   12/04|Boston|
|  5|       Max| Anderson|        |London|
|  6|       Max| Anderson|   06/07|      |
|  7|     Sarah| Nicolson|   02/09|      |
|  8|     Sarah|   Jonson|        |Mexico|
|  9|     Sarah|   Jonson|   01/08|Dallas|
+---+----------+---------+--------+------+

我想按 first_namelast_name 对记录进行分组,然后进行一些比较以考虑记录是否重复。 如果很少有记录具有相同的 first_namelast_name,我想检查 birthday,如果它等于 - 它是重复的。 如果一个或记录有 birthday 填充,另一个 - 没有,它是重复的。如果两个(或多个)记录都为空 birthday - 它是重复的。 相比之下,我忽略了 city 字段,但是在考虑重复项时,我想留下“最丰富”的记录,即填充了更多字段的记录。 如果记录具有相同的名称但不同的生日 - 它不是重复的。

比如上面,我想得到:

+---+----------+---------+--------+------+
| id|first_name|last_name|birthday|  city|
+---+----------+---------+--------+------+
|  1|      Anna|    Smyth|   01/03|    NY|
|  4|       Max| Anderson|   12/04|Boston|
|  6|       Max| Anderson|   06/07|      |
|  7|     Sarah| Nicolson|   02/09|      |
|  9|     Sarah|   Jonson|   01/08|Dallas|
+---+----------+---------+--------+------+

在实际问题中,我有更多的字段 - 大约 70 个,其中一些应该是必须匹配的,有些 - 不是。我需要处理的记录数——大约 1 亿条。 我正在考虑使用 pyspark,但欢迎任何技术

首先,您可以通过连接列 first_name#last_name#birthday 为每一行创建组列 grp,其中 birthday 被分区 [first_name、[=16 中的最大值替换=] 如果它为 null 或空字符串:

from pyspark.sql import Window
from pyspark.sql import functions as F

w1 = Window.partitionBy("first_name", "last_name").orderBy()

df1 = df.withColumn(
    "grp",
    F.concat_ws(
        "#",
        "first_name",
        "last_name",
        F.coalesce(F.expr("nullif(birthday, '')"), F.max("birthday").over(w1))
    )
).withColumn(
    "rich_columns",
    F.array(
        *[F.col(c) for c in df.columns if c not in ["id", "first_name", "last_name"]]
    )
)

df1.show(truncate=False)
#+---+----------+---------+--------+------+--------------------+---------------+
#|id |first_name|last_name|birthday|city  |grp                 |rich_columns   |
#+---+----------+---------+--------+------+--------------------+---------------+
#|1  |Anna      |Smyth    |01/03   |NY    |Anna#Smyth#01/03    |[01/03, NY]    |
#|2  |Anna      |Smyth    |01/03   |      |Anna#Smyth#01/03    |[01/03, ]      |
#|3  |Anna      |Smyth    |01/03   |NY    |Anna#Smyth#01/03    |[01/03, NY]    |
#|7  |Sarah     |Nicolson |02/09   |      |Sarah#Nicolson#02/09|[02/09, ]      |
#|8  |Sarah     |Jonson   |        |Mexico|Sarah#Jonson#01/08  |[, Mexico]     |
#|9  |Sarah     |Jonson   |01/08   |Dallas|Sarah#Jonson#01/08  |[01/08, Dallas]|
#|4  |Max       |Anderson |12/04   |Boston|Max#Anderson#12/04  |[12/04, Boston]|
#|5  |Max       |Anderson |        |London|Max#Anderson#12/04  |[, London]     |
#|6  |Max       |Anderson |06/07   |      |Max#Anderson#06/07  |[06/07, ]      |
#+---+----------+---------+--------+------+--------------------+---------------+

birthdaycity 也用于创建数组列 rich_columns,该列将用于确定最大信息不是 empty/null 的行的优先级。

然后,使用上面创建的组列执行行号并按 rich_columns 数组的大小排序:

w2 = Window.partitionBy("grp").orderBy(
    F.expr("size(filter(rich_columns, x -> nullif(x, '') is not null))").desc()
)

df2 = df1.withColumn("rn", F.row_number().over(w2)) \
    .filter("rn = 1") \
    .drop("grp", "rn", "rich_columns")

df2.show()

#+---+----------+---------+--------+------+
#| id|first_name|last_name|birthday|  city|
#+---+----------+---------+--------+------+
#|  7|     Sarah| Nicolson|   02/09|      |
#|  1|      Anna|    Smyth|   01/03|    NY|
#|  9|     Sarah|   Jonson|   01/08|Dallas|
#|  6|       Max| Anderson|   06/07|      |
#|  4|       Max| Anderson|   12/04|Boston|
#+---+----------+---------+--------+------+

在实际应用程序中,您应该在执行此操作之前将 last_namefirst_name 列转换为大写并去除重音符号(如果有)。