如何使用 Spark 合并两个表并根据条件删除重复项
How to union two tables and remove duplicates with conditions using Spark
假设我们有两个 tables - A
和 B
,它们的列键是 country, name, age
。我想从 A
和 B
创建一个联合 table,称为 result
,这样 result
table 将是唯一的国家和名字对,优先选择 A
.
例如,假设这是 table A
:
val column = Seq("country","name", "age")
val A = Seq(("New York","Smith","10"),
("Washington","Rose","5"),
("Mexico","David","1"))
val aDF = A.toDF(column:_*)
aDF.show(false)
+----------+-----+---+
|country |name |age|
+----------+-----+---+
|New York |Smith|10 |
|Washington|Rose |5 |
|Mexico |David|1 |
+----------+-----+---+
这是table B
:
val B = Seq(("New York","Smith","5"),
("Florida","Smith","5"),
("Washington","Jef","5"),
("Russia","Boris","12"))
val bDF = B.toDF(column:_*)
bDF.show(false)
+----------+-----+---+
|country |name |age|
+----------+-----+---+
|New York |Smith|5 |. // Should not be included in the result table
|Florida |Smith|5 |
|Washington|Jef |5 |
|Russia |Boris|12 |
+----------+-----+---+
结果table将是-
+----------+-----+---+
|country |name |age|
+----------+-----+---+
|New York |Smith|10 | // <New York, Smith> contained in A and B - we take from A
|Washington|Rose |5 |. // Contained in A
|Mexico |David|1 |. // Contained in A
|Florida |Smith|5 |. // Contained in B
|Washington|Jef |5 |. // Contained in B
|Russia |Boris|12 |. // Contained in B
+----------+-----+---+
我如何使用 spark 做到这一点?
您可以在合并操作之前在数据帧 aDF
和 bDF
中分别添加文字值为 1
和 2
的新列 source
。然后,使用此列进行自定义排序以使用 row_number
Window 函数消除重复行:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("country", "name").orderBy("source")
val result = aDF.withColumn("source", lit(1))
.union(bDF.withColumn("source", lit(2)))
.withColumn("rn", row_number().over(w))
.filter("rn = 1")
.drop("rn", "source")
result.show
//+----------+-----+---+
//| country| name|age|
//+----------+-----+---+
//| Florida|Smith| 5|
//|Washington| Rose| 5|
//| Mexico|David| 1|
//|Washington| Jef| 5|
//| Russia|Boris| 12|
//| New York|Smith| 10|
//+----------+-----+---+
完全连接可能比 union + window
:
更有效
aDF.join(bDF, Seq("country", "name"), "full")
.withColumn("age", coalesce(aDF("age"), bDF("age")))
.drop(aDF("age")).drop(bDF("age"))
.show
假设我们有两个 tables - A
和 B
,它们的列键是 country, name, age
。我想从 A
和 B
创建一个联合 table,称为 result
,这样 result
table 将是唯一的国家和名字对,优先选择 A
.
例如,假设这是 table A
:
val column = Seq("country","name", "age")
val A = Seq(("New York","Smith","10"),
("Washington","Rose","5"),
("Mexico","David","1"))
val aDF = A.toDF(column:_*)
aDF.show(false)
+----------+-----+---+
|country |name |age|
+----------+-----+---+
|New York |Smith|10 |
|Washington|Rose |5 |
|Mexico |David|1 |
+----------+-----+---+
这是table B
:
val B = Seq(("New York","Smith","5"),
("Florida","Smith","5"),
("Washington","Jef","5"),
("Russia","Boris","12"))
val bDF = B.toDF(column:_*)
bDF.show(false)
+----------+-----+---+
|country |name |age|
+----------+-----+---+
|New York |Smith|5 |. // Should not be included in the result table
|Florida |Smith|5 |
|Washington|Jef |5 |
|Russia |Boris|12 |
+----------+-----+---+
结果table将是-
+----------+-----+---+
|country |name |age|
+----------+-----+---+
|New York |Smith|10 | // <New York, Smith> contained in A and B - we take from A
|Washington|Rose |5 |. // Contained in A
|Mexico |David|1 |. // Contained in A
|Florida |Smith|5 |. // Contained in B
|Washington|Jef |5 |. // Contained in B
|Russia |Boris|12 |. // Contained in B
+----------+-----+---+
我如何使用 spark 做到这一点?
您可以在合并操作之前在数据帧 aDF
和 bDF
中分别添加文字值为 1
和 2
的新列 source
。然后,使用此列进行自定义排序以使用 row_number
Window 函数消除重复行:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("country", "name").orderBy("source")
val result = aDF.withColumn("source", lit(1))
.union(bDF.withColumn("source", lit(2)))
.withColumn("rn", row_number().over(w))
.filter("rn = 1")
.drop("rn", "source")
result.show
//+----------+-----+---+
//| country| name|age|
//+----------+-----+---+
//| Florida|Smith| 5|
//|Washington| Rose| 5|
//| Mexico|David| 1|
//|Washington| Jef| 5|
//| Russia|Boris| 12|
//| New York|Smith| 10|
//+----------+-----+---+
完全连接可能比 union + window
:
aDF.join(bDF, Seq("country", "name"), "full")
.withColumn("age", coalesce(aDF("age"), bDF("age")))
.drop(aDF("age")).drop(bDF("age"))
.show