删除 spark 数据框中重复的所有记录
Remove all records which are duplicate in spark dataframe
我有一个包含多列的 spark 数据框。我想找出并删除列中具有重复值的行(其他列可以不同)。
我尝试使用 dropDuplicates(col_name)
,但它只会删除重复的条目,但仍会在数据框中保留一条记录。我需要的是删除所有最初包含重复条目的条目。
我正在使用 Spark 1.6 和 Scala 2.10。
这可以通过按列(或列)分组以查找重复项,然后汇总和筛选结果来完成。
示例数据框 df
:
+---+---+
| id|num|
+---+---+
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 4| 5|
+---+---+
按 id
列分组以删除重复项(最后两行):
val df2 = df.groupBy("id")
.agg(first($"num").as("num"), count($"id").as("count"))
.filter($"count" === 1)
.select("id", "num")
这会给你:
+---+---+
| id|num|
+---+---+
| 1| 1|
| 2| 2|
| 3| 3|
+---+---+
或者,可以使用 join
来完成。它会更慢,但如果有很多列,则无需为每个列使用 first($"num").as("num")
来保留它们。
val df2 = df.groupBy("id").agg(count($"id").as("count")).filter($"count" === 1).select("id")
val df3 = df.join(df2, Seq("id"), "inner")
我会为此使用 window-functions。假设您要删除重复的 id
行:
import org.apache.spark.sql.expressions.Window
df
.withColumn("cnt", count("*").over(Window.partitionBy($"id")))
.where($"cnt"===1).drop($"cnt")
.show()
我向使用@Raphael Roth 解决方案的开源 spark-daria 库添加了一个 killDuplicates()
方法。代码使用方法如下:
import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.killDuplicates(col("id"))
// you can also supply multiple Column arguments
df.killDuplicates(col("id"), col("another_column"))
代码实现如下:
object DataFrameExt {
implicit class DataFrameMethods(df: DataFrame) {
def killDuplicates(cols: Column*): DataFrame = {
df
.withColumn(
"my_super_secret_count",
count("*").over(Window.partitionBy(cols: _*))
)
.where(col("my_super_secret_count") === 1)
.drop(col("my_super_secret_count"))
}
}
}
您可能希望利用 spark-daria 库将此逻辑排除在您的代码库之外。
我有一个包含多列的 spark 数据框。我想找出并删除列中具有重复值的行(其他列可以不同)。
我尝试使用 dropDuplicates(col_name)
,但它只会删除重复的条目,但仍会在数据框中保留一条记录。我需要的是删除所有最初包含重复条目的条目。
我正在使用 Spark 1.6 和 Scala 2.10。
这可以通过按列(或列)分组以查找重复项,然后汇总和筛选结果来完成。
示例数据框 df
:
+---+---+
| id|num|
+---+---+
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 4| 5|
+---+---+
按 id
列分组以删除重复项(最后两行):
val df2 = df.groupBy("id")
.agg(first($"num").as("num"), count($"id").as("count"))
.filter($"count" === 1)
.select("id", "num")
这会给你:
+---+---+
| id|num|
+---+---+
| 1| 1|
| 2| 2|
| 3| 3|
+---+---+
或者,可以使用 join
来完成。它会更慢,但如果有很多列,则无需为每个列使用 first($"num").as("num")
来保留它们。
val df2 = df.groupBy("id").agg(count($"id").as("count")).filter($"count" === 1).select("id")
val df3 = df.join(df2, Seq("id"), "inner")
我会为此使用 window-functions。假设您要删除重复的 id
行:
import org.apache.spark.sql.expressions.Window
df
.withColumn("cnt", count("*").over(Window.partitionBy($"id")))
.where($"cnt"===1).drop($"cnt")
.show()
我向使用@Raphael Roth 解决方案的开源 spark-daria 库添加了一个 killDuplicates()
方法。代码使用方法如下:
import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.killDuplicates(col("id"))
// you can also supply multiple Column arguments
df.killDuplicates(col("id"), col("another_column"))
代码实现如下:
object DataFrameExt {
implicit class DataFrameMethods(df: DataFrame) {
def killDuplicates(cols: Column*): DataFrame = {
df
.withColumn(
"my_super_secret_count",
count("*").over(Window.partitionBy(cols: _*))
)
.where(col("my_super_secret_count") === 1)
.drop(col("my_super_secret_count"))
}
}
}
您可能希望利用 spark-daria 库将此逻辑排除在您的代码库之外。