如何根据 spark 数据框中的某些列过滤掉重复的行?

How to filter out duplicate rows based on some columns in spark dataframe?

假设,我有一个如下所示的数据框:

在这里,您可以看到交易编号 1、2 和 3 在 A、B、C 列中具有相同的值,但在 D 和 E 列中具有不同的值。E 列具有日期条目。

  1. 对于相同的 A、B 和 C 组合 (A=1,B=1,C=1),我们有 3 行。我想根据 E 列的最近交易日期只取一行,这意味着具有最新日期的行。但是对于最近的日期,有 2 笔交易。但是,如果在 E 列中找到 A、B、C 和最近日期的相同组合的两行或更多行,我只想取其中之一。 所以我对这个组合的预期输出将是行号 3 或 4(任何一个都可以)。
  2. 对于相同的 A、B 和 C 组合 (A=2,B=2,C=2),我们有 2 行。但是根据 E 列,最近的日期是第 5 行的日期。所以我们将只用这一行来表示 A、B 和 C 的组合。 所以我对这个组合的预期输出将是行号 5

所以最终输出将是(3和5)(4和5)

现在应该怎么处理:

  1. 我读了这个:

Both reduceByKey and groupByKey can be used for the same purposes but reduceByKey works much better on a large dataset. That’s because Spark knows it can combine output with a common key on each partition before shuffling the data.

  1. 我尝试在 A、B、C 列上使用 groupBy,在 E 列上使用 max。但是如果同一日期出现多行,它无法提供行首。

解决此问题的最优化方法是什么?在此先感谢。

编辑: 我需要取回过滤后的交易。还有怎么做?

Link 可以通过几个步骤。聚合数据框:

val agregatedDF=initialDF.select("A","B","C","E").groupBy("A","B","C").agg(max("E").as("E_max"))

Link 初始聚合:

initialDF.join(agregatedDF, List("A","B","C"))

如果初始DataFrame来自Hive,都可以简化。

val initialDF = Seq((1,1,1,1,"2/28/2017 0:00"),(1,1,1,2,"3/1/2017 0:00"),
(1,1,1,3,"3/1/2017 0:00"),(2,2,2,1,"2/28/2017 0:00"),(2,2,2,2,"2/25/20170:00")) 

这将错过相应的 col(D)

initialDF
.toDS.groupBy("_1","_2","_3")
.agg(max(col("_5"))).show 

如果你想要最大col的相应colD:

 initialDF.toDS.map(x=>x._1,x._2,x._3,x._5,x._4))).groupBy("_1","_2","_3")
.agg(max(col("_4")).as("_4")).select(col("_1"),col("_2"),col("_3"),col("_4._2"),col("_4._1")).show

对于 ReduceByKey,您可以将数据集转换为 pairRDD,然后处理 it.Should 更快,以防 Catalyst 无法优化第一个中的 groupByKey。参考

我已经使用 spark window functions 得到我的解决方案:

 val window = Window
      .partitionBy(dataframe("A"), dataframe("B"),dataframe("C"))
      .orderBy(dataframe("E") desc)

 val dfWithRowNumber = dataframe.withColumn("row_number", row_number() over window)
 val filteredDf = dfWithRowNumber.filter(dfWithRowNumber("row_number") === 1)