比较 pyspark 中的两个数据集
Compare two datasets in pyspark
我有 2 个数据集。
示例数据集 1:
id | model | first_name | last_name
-----------------------------------------------------------
1234 | 32 | 456765 | [456700,987565]
-----------------------------------------------------------
4539 | 20 | 123211 | [893456,123456]
-----------------------------------------------------------
有时 first_name 和 last_name 列之一是空的。
示例数据集 2:
number | matricule | name | model
----------------------------------------------------------
AA | 0009 | 456765 | 32
----------------------------------------------------------
AA | 0009 | 893456 | 32
----------------------------------------------------------
AA | 0009 | 456700 | 32
----------------------------------------------------------
AA | 0008 | 456700 | 32
----------------------------------------------------------
AA | 0008 | 987565 | 32
对于一个 matricule
,我们可以找到更多 name
和 model
,就像我上面的例子一样。
我应该做什么:
对于数据集 1 的每一行,我采用 3 列:模型、first_name 和 last_name 并在数据集 2 中查找它们(如果存在/根据矩阵元素匹配)。
我应该比较:
逐个模型 ==> 如果模型(数据集 1)存在于模型(数据集 2)中 ==> 匹配
如果名称中存在 first_name ==> 不匹配。如果 first_name 在名称中不存在 ==> 匹配
如果名称中存在 last_name ==> 匹配。当我有两个值 last_name 时,两者都应该存在于要匹配的数据集 2 的名称中。
示例:
数据集 1 的第 1 行是:
id | model | first_name | last_name
------------------------------------------------------
1234 | 32 | 456765 | [456700,987565]
对于数据集 2 中的 matricule 0009,我有:
number | matricule | name | model
----------------------------------------------------------
AA | 0009 | 456765 | 32
----------------------------------------------------------
AA | 0009 | 893456 | 32
----------------------------------------------------------
AA | 0009 | 456700 | 32
所以:
first_name (456765) 存在于数据集 2 的名称中,当矩阵 =0009 ==> 不匹配时
last_name,只存在 456700 ==> 没有匹配项
数据集 2 的模型中存在模型 (32) ==> 匹配
所以我跳过了 matricule 0009。然后将数据集 1 中的第二行与 matricule 0008 的元素进行比较。
对于数据集 2 中的 matricule 0008,我有:
----------------------------------------------------------
AA | 0008 | 456700 | 32
----------------------------------------------------------
AA | 0008 | 987565 | 32
我们总是在数据集 1 的第一行:
first_name (456765) 当 matricule=0008 ==> match
时,数据集 2 的名称中不存在
last_name,当矩阵 = 0008 时,这两个值都存在于数据集 2 的名称中,==> match
当矩阵 =0008==> 匹配时,模型存在于数据集 2 的模型中
当我找到所有匹配项时,我创建了一个包含以下内容的新数据集:
number | id | matricule
-----------------------------------
AA | 1234 | 0008
-----------------------------------
希望我说的很清楚。有人可以帮助我。
可以在匹配的条件下使用join。
首先,您可以按第二个 DataFrame 分组并将 name
列收集到列表中:
df2 = df2.groupBy("number", "model", "matricule").agg(collect_list("name").alias("names"))
f2.show(truncate=False)
#+------+-----+---------+------------------------+
#|number|model|matricule|names |
#+------+-----+---------+------------------------+
#|AA |32 |0009 |[456765, 893456, 456700]|
#|AA |32 |0008 |[456700, 987565] |
#+------+-----+---------+------------------------+
现在,加入 df1
和 df2
。对于条件 1 和 2,检查起来比较简单。
对于第三个,您可以使用来自 Spark 2.4+ 的 array_except
(last_name
列中不应有不在 names
中的元素,反之亦然):
join_condition = (col("df1.model") == col("df2.model")) \
& ~expr("array_contains(df2.names, df1.first_name)") \
& (size(expr("array_except(df2.names, df1.last_name)")) == lit(0)) \
& (size(expr("array_except(df1.last_name, df2.names)")) == lit(0))
df_result = df1.alias("df1").join(df2.alias("df2"), join_condition)
最后,select 个来自联接结果的所需列:
df_result.select("number", "id", "matricule").show(truncate=False)
#+------+----+---------+
#|number|id |matricule|
#+------+----+---------+
#|AA |1234|0008 |
#+------+----+---------+
我有 2 个数据集。
示例数据集 1:
id | model | first_name | last_name
-----------------------------------------------------------
1234 | 32 | 456765 | [456700,987565]
-----------------------------------------------------------
4539 | 20 | 123211 | [893456,123456]
-----------------------------------------------------------
有时 first_name 和 last_name 列之一是空的。
示例数据集 2:
number | matricule | name | model
----------------------------------------------------------
AA | 0009 | 456765 | 32
----------------------------------------------------------
AA | 0009 | 893456 | 32
----------------------------------------------------------
AA | 0009 | 456700 | 32
----------------------------------------------------------
AA | 0008 | 456700 | 32
----------------------------------------------------------
AA | 0008 | 987565 | 32
对于一个 matricule
,我们可以找到更多 name
和 model
,就像我上面的例子一样。
我应该做什么:
对于数据集 1 的每一行,我采用 3 列:模型、first_name 和 last_name 并在数据集 2 中查找它们(如果存在/根据矩阵元素匹配)。
我应该比较:
逐个模型 ==> 如果模型(数据集 1)存在于模型(数据集 2)中 ==> 匹配
如果名称中存在 first_name ==> 不匹配。如果 first_name 在名称中不存在 ==> 匹配
如果名称中存在 last_name ==> 匹配。当我有两个值 last_name 时,两者都应该存在于要匹配的数据集 2 的名称中。
示例:
数据集 1 的第 1 行是:
id | model | first_name | last_name
------------------------------------------------------
1234 | 32 | 456765 | [456700,987565]
对于数据集 2 中的 matricule 0009,我有:
number | matricule | name | model
----------------------------------------------------------
AA | 0009 | 456765 | 32
----------------------------------------------------------
AA | 0009 | 893456 | 32
----------------------------------------------------------
AA | 0009 | 456700 | 32
所以:
first_name (456765) 存在于数据集 2 的名称中,当矩阵 =0009 ==> 不匹配时
last_name,只存在 456700 ==> 没有匹配项
数据集 2 的模型中存在模型 (32) ==> 匹配
所以我跳过了 matricule 0009。然后将数据集 1 中的第二行与 matricule 0008 的元素进行比较。
对于数据集 2 中的 matricule 0008,我有:
----------------------------------------------------------
AA | 0008 | 456700 | 32
----------------------------------------------------------
AA | 0008 | 987565 | 32
我们总是在数据集 1 的第一行:
first_name (456765) 当 matricule=0008 ==> match
时,数据集 2 的名称中不存在last_name,当矩阵 = 0008 时,这两个值都存在于数据集 2 的名称中,==> match
当矩阵 =0008==> 匹配时,模型存在于数据集 2 的模型中
当我找到所有匹配项时,我创建了一个包含以下内容的新数据集:
number | id | matricule
-----------------------------------
AA | 1234 | 0008
-----------------------------------
希望我说的很清楚。有人可以帮助我。
可以在匹配的条件下使用join。
首先,您可以按第二个 DataFrame 分组并将 name
列收集到列表中:
df2 = df2.groupBy("number", "model", "matricule").agg(collect_list("name").alias("names"))
f2.show(truncate=False)
#+------+-----+---------+------------------------+
#|number|model|matricule|names |
#+------+-----+---------+------------------------+
#|AA |32 |0009 |[456765, 893456, 456700]|
#|AA |32 |0008 |[456700, 987565] |
#+------+-----+---------+------------------------+
现在,加入 df1
和 df2
。对于条件 1 和 2,检查起来比较简单。
对于第三个,您可以使用来自 Spark 2.4+ 的 array_except
(last_name
列中不应有不在 names
中的元素,反之亦然):
join_condition = (col("df1.model") == col("df2.model")) \
& ~expr("array_contains(df2.names, df1.first_name)") \
& (size(expr("array_except(df2.names, df1.last_name)")) == lit(0)) \
& (size(expr("array_except(df1.last_name, df2.names)")) == lit(0))
df_result = df1.alias("df1").join(df2.alias("df2"), join_condition)
最后,select 个来自联接结果的所需列:
df_result.select("number", "id", "matricule").show(truncate=False)
#+------+----+---------+
#|number|id |matricule|
#+------+----+---------+
#|AA |1234|0008 |
#+------+----+---------+