如何删除在特定列上具有相同值的重复记录并使用 pyspark 保留具有最高时间戳的记录
How to drop duplicates records that have the same value on a specific column and retain the one with the highest timestamp using pyspark
我试过下面的代码。下面脚本的想法是按 id 和时间戳对记录进行排序,并按 processed_timestamp 降序排列,但是当我尝试 运行 查询时,它没有按降序排列记录时间戳。它甚至删除最新的记录并保留不应该出现的旧记录
df2 = df_concat.orderBy("id", "processed_timestamp", f.col("processed_timestamp").desc()).dropDuplicates(["id"])
我也尝试了下面的方法,但是当我尝试将其转换回数据框时,table 模式现在不同了,记录现在位于一个列中并用逗号分隔。它还会删除最新记录并保留不应该出现的旧记录
def selectRowByTimeStamp(x,y):
if x.processed_timestamp > y.processed_timestamp:
return x
return y
dataMap = df_concat.rdd.map(lambda x: (x.id, x))
newdata = dataMap.reduceByKey(selectRowByTimeStamp)
我不确定我是否正确理解了上述代码的工作原理。
如果不是一个简单的错误,您的代码将按预期工作。
你不应该使用列名称 "processed_timestamp"
两次:
df2 = df_concat.orderBy(
"id", f.col("processed_timestamp").desc()
).dropDuplicates(["id"])
您的代码按 processed_timestamp
升序对 DataFrame 进行排序,因为原始列名排在第一位。
我试过下面的代码。下面脚本的想法是按 id 和时间戳对记录进行排序,并按 processed_timestamp 降序排列,但是当我尝试 运行 查询时,它没有按降序排列记录时间戳。它甚至删除最新的记录并保留不应该出现的旧记录
df2 = df_concat.orderBy("id", "processed_timestamp", f.col("processed_timestamp").desc()).dropDuplicates(["id"])
我也尝试了下面的方法,但是当我尝试将其转换回数据框时,table 模式现在不同了,记录现在位于一个列中并用逗号分隔。它还会删除最新记录并保留不应该出现的旧记录
def selectRowByTimeStamp(x,y):
if x.processed_timestamp > y.processed_timestamp:
return x
return y
dataMap = df_concat.rdd.map(lambda x: (x.id, x))
newdata = dataMap.reduceByKey(selectRowByTimeStamp)
我不确定我是否正确理解了上述代码的工作原理。
如果不是一个简单的错误,您的代码将按预期工作。
你不应该使用列名称 "processed_timestamp"
两次:
df2 = df_concat.orderBy(
"id", f.col("processed_timestamp").desc()
).dropDuplicates(["id"])
您的代码按 processed_timestamp
升序对 DataFrame 进行排序,因为原始列名排在第一位。