如何使用 Spark 合并两个表并根据条件删除重复项

How to union two tables and remove duplicates with conditions using Spark

假设我们有两个 tables - AB,它们的列键是 country, name, age。我想从 AB 创建一个联合 table,称为 result,这样 result table 将是唯一的国家和名字对,优先选择 A.

例如,假设这是 table A:

val column = Seq("country","name", "age")
val A = Seq(("New York","Smith","10"),
      ("Washington","Rose","5"),
      ("Mexico","David","1"))
val aDF = A.toDF(column:_*)
aDF.show(false)

+----------+-----+---+
|country   |name |age|
+----------+-----+---+
|New York  |Smith|10 |
|Washington|Rose |5  |
|Mexico    |David|1  |
+----------+-----+---+

这是table B:

val B = Seq(("New York","Smith","5"),
    ("Florida","Smith","5"),
    ("Washington","Jef","5"),
    ("Russia","Boris","12"))

val bDF = B.toDF(column:_*)
bDF.show(false)

+----------+-----+---+
|country   |name |age|
+----------+-----+---+
|New York  |Smith|5  |.  // Should not be included in the result table
|Florida   |Smith|5  |
|Washington|Jef  |5  |
|Russia    |Boris|12 |
+----------+-----+---+

结果table将是-

+----------+-----+---+
|country   |name |age|
+----------+-----+---+
|New York  |Smith|10 |  // <New York, Smith> contained in A and B - we take from A
|Washington|Rose |5  |. // Contained in A
|Mexico    |David|1  |. // Contained in A
|Florida   |Smith|5  |. // Contained in B
|Washington|Jef  |5  |. // Contained in B
|Russia    |Boris|12 |. // Contained in B
+----------+-----+---+

我如何使用 spark 做到这一点?

您可以在合并操作之前在数据帧 aDFbDF 中分别添加文字值为 12 的新列 source。然后,使用此列进行自定义排序以使用 row_number Window 函数消除重复行:

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("country", "name").orderBy("source")

val result = aDF.withColumn("source", lit(1))
  .union(bDF.withColumn("source", lit(2)))
  .withColumn("rn", row_number().over(w))
  .filter("rn = 1")
  .drop("rn", "source")

result.show

//+----------+-----+---+
//|   country| name|age|
//+----------+-----+---+
//|   Florida|Smith|  5|
//|Washington| Rose|  5|
//|    Mexico|David|  1|
//|Washington|  Jef|  5|
//|    Russia|Boris| 12|
//|  New York|Smith| 10|
//+----------+-----+---+

完全连接可能比 union + window:

更有效
aDF.join(bDF, Seq("country", "name"), "full")
.withColumn("age", coalesce(aDF("age"), bDF("age")))
.drop(aDF("age")).drop(bDF("age"))
.show