如何根据 spark 数据框中的某些列过滤掉重复的行?
How to filter out duplicate rows based on some columns in spark dataframe?
假设,我有一个如下所示的数据框:
在这里,您可以看到交易编号 1、2 和 3 在 A、B、C 列中具有相同的值,但在 D 和 E 列中具有不同的值。E 列具有日期条目。
- 对于相同的 A、B 和 C 组合 (A=1,B=1,C=1),我们有 3 行。我想根据 E 列的最近交易日期只取一行,这意味着具有最新日期的行。但是对于最近的日期,有 2 笔交易。但是,如果在 E 列中找到 A、B、C 和最近日期的相同组合的两行或更多行,我只想取其中之一。
所以我对这个组合的预期输出将是行号 3 或 4(任何一个都可以)。
- 对于相同的 A、B 和 C 组合 (A=2,B=2,C=2),我们有 2 行。但是根据 E 列,最近的日期是第 5 行的日期。所以我们将只用这一行来表示 A、B 和 C 的组合。
所以我对这个组合的预期输出将是行号 5
所以最终输出将是(3和5)或(4和5)。
现在应该怎么处理:
- 我读了这个:
Both reduceByKey and groupByKey can be used for the same purposes but
reduceByKey works much better on a large dataset. That’s because Spark
knows it can combine output with a common key on each partition before
shuffling the data.
- 我尝试在 A、B、C 列上使用 groupBy,在 E 列上使用 max。但是如果同一日期出现多行,它无法提供行首。
解决此问题的最优化方法是什么?在此先感谢。
编辑: 我需要取回过滤后的交易。还有怎么做?
Link 可以通过几个步骤。聚合数据框:
val agregatedDF=initialDF.select("A","B","C","E").groupBy("A","B","C").agg(max("E").as("E_max"))
Link 初始聚合:
initialDF.join(agregatedDF, List("A","B","C"))
如果初始DataFrame来自Hive,都可以简化。
val initialDF = Seq((1,1,1,1,"2/28/2017 0:00"),(1,1,1,2,"3/1/2017 0:00"),
(1,1,1,3,"3/1/2017 0:00"),(2,2,2,1,"2/28/2017 0:00"),(2,2,2,2,"2/25/20170:00"))
这将错过相应的 col(D)
initialDF
.toDS.groupBy("_1","_2","_3")
.agg(max(col("_5"))).show
如果你想要最大col的相应colD:
initialDF.toDS.map(x=>x._1,x._2,x._3,x._5,x._4))).groupBy("_1","_2","_3")
.agg(max(col("_4")).as("_4")).select(col("_1"),col("_2"),col("_3"),col("_4._2"),col("_4._1")).show
对于 ReduceByKey,您可以将数据集转换为 pairRDD,然后处理 it.Should 更快,以防 Catalyst 无法优化第一个中的 groupByKey。参考
我已经使用 spark window functions 得到我的解决方案:
val window = Window
.partitionBy(dataframe("A"), dataframe("B"),dataframe("C"))
.orderBy(dataframe("E") desc)
val dfWithRowNumber = dataframe.withColumn("row_number", row_number() over window)
val filteredDf = dfWithRowNumber.filter(dfWithRowNumber("row_number") === 1)
假设,我有一个如下所示的数据框:
在这里,您可以看到交易编号 1、2 和 3 在 A、B、C 列中具有相同的值,但在 D 和 E 列中具有不同的值。E 列具有日期条目。
- 对于相同的 A、B 和 C 组合 (A=1,B=1,C=1),我们有 3 行。我想根据 E 列的最近交易日期只取一行,这意味着具有最新日期的行。但是对于最近的日期,有 2 笔交易。但是,如果在 E 列中找到 A、B、C 和最近日期的相同组合的两行或更多行,我只想取其中之一。 所以我对这个组合的预期输出将是行号 3 或 4(任何一个都可以)。
- 对于相同的 A、B 和 C 组合 (A=2,B=2,C=2),我们有 2 行。但是根据 E 列,最近的日期是第 5 行的日期。所以我们将只用这一行来表示 A、B 和 C 的组合。 所以我对这个组合的预期输出将是行号 5
所以最终输出将是(3和5)或(4和5)。
现在应该怎么处理:
- 我读了这个:
Both reduceByKey and groupByKey can be used for the same purposes but reduceByKey works much better on a large dataset. That’s because Spark knows it can combine output with a common key on each partition before shuffling the data.
- 我尝试在 A、B、C 列上使用 groupBy,在 E 列上使用 max。但是如果同一日期出现多行,它无法提供行首。
解决此问题的最优化方法是什么?在此先感谢。
编辑: 我需要取回过滤后的交易。还有怎么做?
Link 可以通过几个步骤。聚合数据框:
val agregatedDF=initialDF.select("A","B","C","E").groupBy("A","B","C").agg(max("E").as("E_max"))
Link 初始聚合:
initialDF.join(agregatedDF, List("A","B","C"))
如果初始DataFrame来自Hive,都可以简化。
val initialDF = Seq((1,1,1,1,"2/28/2017 0:00"),(1,1,1,2,"3/1/2017 0:00"),
(1,1,1,3,"3/1/2017 0:00"),(2,2,2,1,"2/28/2017 0:00"),(2,2,2,2,"2/25/20170:00"))
这将错过相应的 col(D)
initialDF
.toDS.groupBy("_1","_2","_3")
.agg(max(col("_5"))).show
如果你想要最大col的相应colD:
initialDF.toDS.map(x=>x._1,x._2,x._3,x._5,x._4))).groupBy("_1","_2","_3")
.agg(max(col("_4")).as("_4")).select(col("_1"),col("_2"),col("_3"),col("_4._2"),col("_4._1")).show
对于 ReduceByKey,您可以将数据集转换为 pairRDD,然后处理 it.Should 更快,以防 Catalyst 无法优化第一个中的 groupByKey。参考
我已经使用 spark window functions 得到我的解决方案:
val window = Window
.partitionBy(dataframe("A"), dataframe("B"),dataframe("C"))
.orderBy(dataframe("E") desc)
val dfWithRowNumber = dataframe.withColumn("row_number", row_number() over window)
val filteredDf = dfWithRowNumber.filter(dfWithRowNumber("row_number") === 1)