如何有效地删除 Spark Dataframe 中的重复行,保持时间戳最高的行

How to efficiently remove duplicate rows in Spark Dataframe, keeping row with highest timestamp

我有一个从 Postgres 读取的大型数据集。它有一个 ID 列、一个时间戳列和其他几个可能已更新的列。对于每个 ID,我希望只保留最近更新的行(最高时间戳值)。我想出了一个可行的解决方案,但我担心它 (1.) 效率不高,并且 (2.) 可能不会对整个集合进行重复数据删除,而只会在 运行s 所在的每个分区上进行重复数据删除(因为这将 运行ning 在多节点集群上)。

下面是一些示例数据,基本上演示了我的技术:

名为 dfTest 的数据帧中的原始数据:

+---+--------+-----------+                                                      
| id|    city|update_time|
+---+--------+-----------+
|456|   Miami|   01:15:30|
|456| Seattle|   11:15:43|
|457| Toronto|   01:15:00|
|457| Chicago|   01:17:30|
|457|New York|   02:15:37|
|458|  Dallas|   01:18:35|
|459| Houston|   01:12:41|
|460| Chicago|   03:25:31|
|460|Montreal|   02:12:07|
|461|  Boston|   01:15:30|
+---+--------+-----------+

我将其放入临时视图中:

dfTest.createOrReplaceTempView("test")

然后我 运行 这个火花 SQL 查询:

    val query =
      s"""
         |select 
         |  id, 
         |  city, 
         |  update_time 
         |from (
         |  select 
         |    id, 
         |    city, 
         |    update_time, 
         |    row_number() over(partition by (id) order by update_time desc) as row_num 
         |    from test
         |) 
         |where row_num = 1
         |""".stripMargin
spark.sql(query).show()

这给出了正确的结果,每个 ID 只有一行:

+---+--------+-----------+
| id|    city|update_time|
+---+--------+-----------+
|456| Seattle|   11:15:43|
|457|New York|   02:15:37|
|458|  Dallas|   01:18:35|
|459| Houston|   01:12:41|
|460| Chicago|   03:25:31|
|461|  Boston|   01:15:30|
+---+--------+-----------+

我的问题是:

1.) 当 运行 在具有多个节点的集群上的大型数据集上时,我能否期望它仍然正常工作?

2.) 这是一种有效的方法吗?有没有办法使用 spark 函数而不是查询更有效地执行此操作?

Spark 尝试在分区内处理数据,但它可以在执行连接、聚合、window 函数等时随机播放数据。因此它将 运行 您的 SQL 正确.

并且它应该 可以高效地处理大型数据集,除非存在数据倾斜。如上所述,为了计算 window 函数,Spark 将打乱记录,以便相同的 ID 落在相同的分区上。如果您提供一个数据集,其中 50% 的记录的 id = 0,那么所有这些记录将被转储到单个执行程序中,这可能会导致麻烦。

最后,您可以使用 Spark 函数编写相同的代码,即。 Dataset API,但结果应该是等价的。

  1. 您可以期待您的查询工作; Spark 会小心地将您的数据在节点之间洗牌,以 运行 您的查询正确。

  2. 不确定它是否更高效,但使用 Spark 函数更易于维护。这是 spark-shell 中的示例。我通过 运行ning

    得到了输入
var dfTest = Seq(("456","Miami","01:15:30"),
("456","Seattle","11:15:43"),
("457","Toronto","01:15:00"),
("457","Chicago","01:17:30"),
("457","New York","02:15:37"),
("458","Dallas","01:18:35"),
("459","Houston","01:12:41"),
("460","Chicago","03:25:31"),
("460","Montreal","02:12:07"),
("461","  Boston","01:15:30")).toDF("id", "city", "update_time")

相当于您的 SQL 是

dfTest = dfTest.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy(desc("update_time"))))
.select("id", "city", "update_time")
.filter(col("rn") === 1)

运行 dfTest.show 给出此输出

+---+--------+-----------+
| id|    city|update_time|
+---+--------+-----------+
|459| Houston|   01:12:41|
|458|  Dallas|   01:18:35|
|456| Seattle|   11:15:43|
|461|  Boston|   01:15:30|
|457|New York|   02:15:37|
|460| Chicago|   03:25:31|
+---+--------+-----------+