删除 spark 数据框中重复的所有记录

Remove all records which are duplicate in spark dataframe

我有一个包含多列的 spark 数据框。我想找出并删除列中具有重复值的行(其他列可以不同)。

我尝试使用 dropDuplicates(col_name),但它只会删除重复的条目,但仍会在数据框中保留一条记录。我需要的是删除所有最初包含重复条目的条目。

我正在使用 Spark 1.6 和 Scala 2.10。

这可以通过按列(或列)分组以查找重复项,然后汇总和筛选结果来完成。

示例数据框 df:

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  4|  5|
+---+---+

id 列分组以删除重复项(最后两行):

val df2 = df.groupBy("id")
  .agg(first($"num").as("num"), count($"id").as("count"))
  .filter($"count" === 1)
  .select("id", "num")

这会给你:

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
+---+---+

或者,可以使用 join 来完成。它会更慢,但如果有很多列,则无需为每个列使用 first($"num").as("num") 来保留它们。

val df2 = df.groupBy("id").agg(count($"id").as("count")).filter($"count" === 1).select("id")
val df3 = df.join(df2, Seq("id"), "inner")

我会为此使用 window-functions。假设您要删除重复的 id 行:

import org.apache.spark.sql.expressions.Window

df
  .withColumn("cnt", count("*").over(Window.partitionBy($"id")))
  .where($"cnt"===1).drop($"cnt")
  .show()

我向使用@Raphael Roth 解决方案的开源 spark-daria 库添加了一个 killDuplicates() 方法。代码使用方法如下:

import com.github.mrpowers.spark.daria.sql.DataFrameExt._

df.killDuplicates(col("id"))

// you can also supply multiple Column arguments
df.killDuplicates(col("id"), col("another_column"))

代码实现如下:

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def killDuplicates(cols: Column*): DataFrame = {
      df
        .withColumn(
          "my_super_secret_count",
          count("*").over(Window.partitionBy(cols: _*))
        )
        .where(col("my_super_secret_count") === 1)
        .drop(col("my_super_secret_count"))
    }

  }

}

您可能希望利用 spark-daria 库将此逻辑排除在您的代码库之外。