根据 RDD/Spark DataFrame 中的特定列从行中删除重复项
Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame
假设我有一个相当大的数据集,格式如下:
data = sc.parallelize([('Foo',41,'US',3),
('Foo',39,'UK',1),
('Bar',57,'CA',2),
('Bar',72,'CA',2),
('Baz',22,'US',6),
('Baz',36,'US',6)])
我想做的是只根据第一、第三和第四列的值删除重复的行。
删除完全重复的行很简单:
data = data.distinct()
第 5 行或第 6 行将被删除
但是我如何只删除基于第 1、3 和 4 列的重复行?即删除其中之一:
('Baz',22,'US',6)
('Baz',36,'US',6)
在 Python 中,这可以通过使用 .drop_duplicates()
指定列来完成。我怎样才能在 Spark/Pyspark 中达到同样的效果?
根据您的问题,您不清楚要使用哪些列来确定重复项。该解决方案背后的总体思路是根据识别重复项的列的值创建一个键。然后,您可以使用 reduceByKey 或 reduce 操作来消除重复。
这里有一些代码可以帮助您入门:
def get_key(x):
return "{0}{1}{2}".format(x[0],x[2],x[3])
m = data.map(lambda x: (get_key(x),x))
现在,您有一个键值 RDD
,由第 1,3 和 4 列作为键值。
下一步将是 reduceByKey
或 groupByKey
和 filter
。
这将消除重复项。
r = m.reduceByKey(lambda x,y: (x))
我知道您已经接受了另一个答案,但是如果您想这样做
DataFrame,就用groupBy和agg。假设您已经创建了一个 DF(具有名为 "col1"、"col2" 等的列),您可以这样做:
myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")
请注意,在本例中,我选择了 col2 的 Max,但您可以选择 avg、min 等。
同意大卫的观点。另外,可能不会是我们想要 groupBy 除了聚合函数中的列之外的所有列的情况,即,如果我们希望完全基于列的子集删除重复项并保留原始数据框中的所有列。因此,更好的方法是使用 Spark 1.4.0
中可用的 dropDuplicates Dataframe api
参考:https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame
Pyspark 确实 包含一个 dropDuplicates()
方法,该方法在 1.4 中引入。 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
+---+------+-----+
我使用了内置函数 dropDuplicates()。下面给出 Scala 代码
val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")
data.dropDuplicates(Array("x","count")).show()
输出:
+---+---+---+-----+
| x| y| z|count|
+---+---+---+-----+
|Baz| 22| US| 6|
|Foo| 39| UK| 1|
|Foo| 41| US| 3|
|Bar| 57| CA| 2|
+---+---+---+-----+
这是我的 Df 包含 4 重复两次所以这里将删除重复的值。
scala> df.show
+-----+
|value|
+-----+
| 1|
| 4|
| 3|
| 5|
| 4|
| 18|
+-----+
scala> val newdf=df.dropDuplicates
scala> newdf.show
+-----+
|value|
+-----+
| 1|
| 3|
| 5|
| 4|
| 18|
+-----+
下面的程序将帮助您整体删除重复项,或者如果您想根据某些列删除重复项,您甚至可以这样做:
import org.apache.spark.sql.SparkSession
object DropDuplicates {
def main(args: Array[String]) {
val spark =
SparkSession.builder()
.appName("DataFrame-DropDuplicates")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// create an RDD of tuples with some data
val custs = Seq(
(1, "Widget Co", 120000.00, 0.00, "AZ"),
(2, "Acme Widgets", 410500.00, 500.00, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(4, "Widgets R Us", 410500.00, 0.0, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
(6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)
// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")
println("*** Here's the whole DataFrame with duplicates")
customerDF.printSchema()
customerDF.show()
// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()
println("*** Now without duplicates")
withoutDuplicates.show()
val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))
println("*** Now without partial duplicates too")
withoutPartials.show()
}
}
假设我有一个相当大的数据集,格式如下:
data = sc.parallelize([('Foo',41,'US',3),
('Foo',39,'UK',1),
('Bar',57,'CA',2),
('Bar',72,'CA',2),
('Baz',22,'US',6),
('Baz',36,'US',6)])
我想做的是只根据第一、第三和第四列的值删除重复的行。
删除完全重复的行很简单:
data = data.distinct()
第 5 行或第 6 行将被删除
但是我如何只删除基于第 1、3 和 4 列的重复行?即删除其中之一:
('Baz',22,'US',6)
('Baz',36,'US',6)
在 Python 中,这可以通过使用 .drop_duplicates()
指定列来完成。我怎样才能在 Spark/Pyspark 中达到同样的效果?
根据您的问题,您不清楚要使用哪些列来确定重复项。该解决方案背后的总体思路是根据识别重复项的列的值创建一个键。然后,您可以使用 reduceByKey 或 reduce 操作来消除重复。
这里有一些代码可以帮助您入门:
def get_key(x):
return "{0}{1}{2}".format(x[0],x[2],x[3])
m = data.map(lambda x: (get_key(x),x))
现在,您有一个键值 RDD
,由第 1,3 和 4 列作为键值。
下一步将是 reduceByKey
或 groupByKey
和 filter
。
这将消除重复项。
r = m.reduceByKey(lambda x,y: (x))
我知道您已经接受了另一个答案,但是如果您想这样做 DataFrame,就用groupBy和agg。假设您已经创建了一个 DF(具有名为 "col1"、"col2" 等的列),您可以这样做:
myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")
请注意,在本例中,我选择了 col2 的 Max,但您可以选择 avg、min 等。
同意大卫的观点。另外,可能不会是我们想要 groupBy 除了聚合函数中的列之外的所有列的情况,即,如果我们希望完全基于列的子集删除重复项并保留原始数据框中的所有列。因此,更好的方法是使用 Spark 1.4.0
中可用的 dropDuplicates Dataframe api参考:https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame
Pyspark 确实 包含一个 dropDuplicates()
方法,该方法在 1.4 中引入。 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html
>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
| 10| 80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 5| 80|Alice|
+---+------+-----+
我使用了内置函数 dropDuplicates()。下面给出 Scala 代码
val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")
data.dropDuplicates(Array("x","count")).show()
输出:
+---+---+---+-----+
| x| y| z|count|
+---+---+---+-----+
|Baz| 22| US| 6|
|Foo| 39| UK| 1|
|Foo| 41| US| 3|
|Bar| 57| CA| 2|
+---+---+---+-----+
这是我的 Df 包含 4 重复两次所以这里将删除重复的值。
scala> df.show
+-----+
|value|
+-----+
| 1|
| 4|
| 3|
| 5|
| 4|
| 18|
+-----+
scala> val newdf=df.dropDuplicates
scala> newdf.show
+-----+
|value|
+-----+
| 1|
| 3|
| 5|
| 4|
| 18|
+-----+
下面的程序将帮助您整体删除重复项,或者如果您想根据某些列删除重复项,您甚至可以这样做:
import org.apache.spark.sql.SparkSession
object DropDuplicates {
def main(args: Array[String]) {
val spark =
SparkSession.builder()
.appName("DataFrame-DropDuplicates")
.master("local[4]")
.getOrCreate()
import spark.implicits._
// create an RDD of tuples with some data
val custs = Seq(
(1, "Widget Co", 120000.00, 0.00, "AZ"),
(2, "Acme Widgets", 410500.00, 500.00, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(4, "Widgets R Us", 410500.00, 0.0, "CA"),
(3, "Widgetry", 410500.00, 200.00, "CA"),
(5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
(6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)
// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")
println("*** Here's the whole DataFrame with duplicates")
customerDF.printSchema()
customerDF.show()
// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()
println("*** Now without duplicates")
withoutDuplicates.show()
val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))
println("*** Now without partial duplicates too")
withoutPartials.show()
}
}