spark:如何在保持时间戳最高的行的同时对数据框执行 dropDuplicates
spark: How to do a dropDuplicates on a dataframe while keeping the highest timestamped row
我有一个用例,我需要删除数据框的重复行(在这种情况下,重复意味着它们具有相同的 'id' 字段),同时保持最高的行 'timestamp'(unix 时间戳)字段。
我找到了 drop_duplicate 方法(我使用的是 pyspark),但无法控制要保留的项目。
有人可以帮忙吗?提前致谢
可能需要手动 map 和 reduce 才能提供您想要的功能。
def selectRowByTimeStamp(x,y):
if x.timestamp > y.timestamp:
return x
return y
dataMap = data.map(lambda x: (x.id, x))
uniqueData = dataMap.reduceByKey(selectRowByTimeStamp)
这里我们根据 id 对所有数据进行分组。然后,当我们减少分组时,我们通过保留具有最高时间戳的记录来这样做。当代码减少后,每个id只剩下1条记录。
你可以这样做:
val df = Seq(
(1,12345678,"this is a test"),
(1,23456789, "another test"),
(2,2345678,"2nd test"),
(2,1234567, "2nd another test")
).toDF("id","timestamp","data")
+---+---------+----------------+
| id|timestamp| data|
+---+---------+----------------+
| 1| 12345678| this is a test|
| 1| 23456789| another test|
| 2| 2345678| 2nd test|
| 2| 1234567|2nd another test|
+---+---------+----------------+
df.join(
df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
$"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
+---+---------+------------+
| id|timestamp| data|
+---+---------+------------+
| 1| 23456789|another test|
| 2| 2345678| 2nd test|
+---+---------+------------+
如果您希望 id
可以重复 timestamp
(请参阅下面的评论),您可以这样做:
df.dropDuplicates(Seq("id", "timestamp")).join(
df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
$"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
我有一个用例,我需要删除数据框的重复行(在这种情况下,重复意味着它们具有相同的 'id' 字段),同时保持最高的行 'timestamp'(unix 时间戳)字段。
我找到了 drop_duplicate 方法(我使用的是 pyspark),但无法控制要保留的项目。
有人可以帮忙吗?提前致谢
可能需要手动 map 和 reduce 才能提供您想要的功能。
def selectRowByTimeStamp(x,y):
if x.timestamp > y.timestamp:
return x
return y
dataMap = data.map(lambda x: (x.id, x))
uniqueData = dataMap.reduceByKey(selectRowByTimeStamp)
这里我们根据 id 对所有数据进行分组。然后,当我们减少分组时,我们通过保留具有最高时间戳的记录来这样做。当代码减少后,每个id只剩下1条记录。
你可以这样做:
val df = Seq(
(1,12345678,"this is a test"),
(1,23456789, "another test"),
(2,2345678,"2nd test"),
(2,1234567, "2nd another test")
).toDF("id","timestamp","data")
+---+---------+----------------+
| id|timestamp| data|
+---+---------+----------------+
| 1| 12345678| this is a test|
| 1| 23456789| another test|
| 2| 2345678| 2nd test|
| 2| 1234567|2nd another test|
+---+---------+----------------+
df.join(
df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
$"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
+---+---------+------------+
| id|timestamp| data|
+---+---------+------------+
| 1| 23456789|another test|
| 2| 2345678| 2nd test|
+---+---------+------------+
如果您希望 id
可以重复 timestamp
(请参阅下面的评论),您可以这样做:
df.dropDuplicates(Seq("id", "timestamp")).join(
df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
$"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show