比较 pyspark 中的两个数据集

Compare two datasets in pyspark

我有 2 个数据集。

示例数据集 1:

id     |   model |   first_name   |      last_name
-----------------------------------------------------------
1234   |   32    |    456765      |   [456700,987565]
-----------------------------------------------------------
4539   |   20    |    123211      |   [893456,123456]
-----------------------------------------------------------

有时 first_name 和 last_name 列之一是空的。

示例数据集 2:

number  |  matricule   | name       |    model
----------------------------------------------------------
AA      |  0009        |  456765    |     32
----------------------------------------------------------
AA      |  0009        |  893456    |     32
----------------------------------------------------------
AA      |  0009        |  456700    |     32
----------------------------------------------------------
AA      |  0008        |  456700    |     32
----------------------------------------------------------
AA      |  0008        |  987565    |     32

对于一个 matricule,我们可以找到更多 namemodel,就像我上面的例子一样。 我应该做什么:

对于数据集 1 的每一行,我采用 3 列:模型、first_name 和 last_name 并在数据集 2 中查找它们(如果存在/根据矩阵元素匹配)。

我应该比较:

示例:

数据集 1 的第 1 行是:

id     |   model |   first_name   |      last_name
------------------------------------------------------
1234   |   32    |    456765      |   [456700,987565]

对于数据集 2 中的 matricule 0009,我有:

number  |  matricule   | name       |    model
----------------------------------------------------------
AA      |  0009        |  456765    |     32
----------------------------------------------------------
AA      |  0009        |  893456    |     32
----------------------------------------------------------
AA      |  0009        |  456700    |     32

所以:

first_name (456765) 存在于数据集 2 的名称中,当矩阵 =0009 ==> 不匹配时

last_name,只存在 456700 ==> 没有匹配项

数据集 2 的模型中存在模型 (32) ==> 匹配

所以我跳过了 matricule 0009。然后将数据集 1 中的第二行与 matricule 0008 的元素进行比较。

对于数据集 2 中的 matricule 0008,我有:

----------------------------------------------------------
AA      |  0008        |  456700    |     32
----------------------------------------------------------
AA      |  0008        |  987565    |     32

我们总是在数据集 1 的第一行:

first_name (456765) 当 matricule=0008 ==> match

时,数据集 2 的名称中不存在

last_name,当矩阵 = 0008 时,这两个值都存在于数据集 2 的名称中,==> match

当矩阵 =0008==> 匹配时,模型存在于数据集 2 的模型中

当我找到所有匹配项时,我创建了一个包含以下内容的新数据集:

number | id     |  matricule
-----------------------------------
AA     | 1234   | 0008
-----------------------------------

希望我说的很清楚。有人可以帮助我。

可以在匹配的条件下使用join。

首先,您可以按第二个 DataFrame 分组并将 name 列收集到列表中:

df2 = df2.groupBy("number", "model", "matricule").agg(collect_list("name").alias("names"))
f2.show(truncate=False)

#+------+-----+---------+------------------------+
#|number|model|matricule|names                   |
#+------+-----+---------+------------------------+
#|AA    |32   |0009     |[456765, 893456, 456700]|
#|AA    |32   |0008     |[456700, 987565]        |
#+------+-----+---------+------------------------+

现在,加入 df1df2。对于条件 1 和 2,检查起来比较简单。 对于第三个,您可以使用来自 Spark 2.4+ 的 array_exceptlast_name 列中不应有不在 names 中的元素,反之亦然):

join_condition = (col("df1.model") == col("df2.model")) \
                 & ~expr("array_contains(df2.names, df1.first_name)") \
                 & (size(expr("array_except(df2.names, df1.last_name)")) == lit(0)) \
                 & (size(expr("array_except(df1.last_name, df2.names)")) == lit(0))


df_result = df1.alias("df1").join(df2.alias("df2"), join_condition)

最后,select 个来自联接结果的所需列:

df_result.select("number", "id", "matricule").show(truncate=False)

#+------+----+---------+
#|number|id  |matricule|
#+------+----+---------+
#|AA    |1234|0008     |
#+------+----+---------+