Pyspark 数据框删除 AWS Glue 脚本中的重复项
Pyspark dataframe remove duplicate in AWS Glue Script
我在 AWS Glue ETL 作业中有一个脚本,它读取一个包含大量镶木地板文件的 S3 存储桶,按 key1、key2 和时间戳字段进行排序。之后脚本删除重复项并将单个镶木地板文件保存在其他 S3 存储桶中。
查看我在作业运行前的数据:
key1
key2
uploadTimestamp
0005541779
10
2021-12-29 14:54:08.753
0005541779
10
2021-12-29 15:06:05.968
排序和剔除重复代码:
#############################################################
tempDF = S3bucket_node1.toDF() #from Dynamic Frame to Data Frame
sortedDF = tempDF.orderBy(f.col("uploadTimestamp").desc(),"key1","key2").dropDuplicates(["key1","key2"]) #sort and remove duplicates
dynamicFrame = DynamicFrame.fromDF(sortedDF, glueContext, 'salesOrder') #back to Dynamic Frame
#############################################################
通过以下方式下单后查看此图片:
我的问题:
在输出文件中,一些数据获得了最后一个时间戳,一些数据获得了第一个时间戳。我不明白为什么它不适用于所有数据。
谢谢。
它适用于以下代码:
tempDF = S3bucket_node1.toDF()
w = Window.partitionBy("key1","key2").orderBy(f.desc("uploadTimestamp"))
df = tempDF.withColumn("rn", f.row_number().over(w)).filter("rn = 1").drop("rn")
dynamicFrame = DynamicFrame.fromDF(df, glueContext, 'dynamicFrame')
解决该问题的技巧可在此处找到:
我在 AWS Glue ETL 作业中有一个脚本,它读取一个包含大量镶木地板文件的 S3 存储桶,按 key1、key2 和时间戳字段进行排序。之后脚本删除重复项并将单个镶木地板文件保存在其他 S3 存储桶中。
查看我在作业运行前的数据:
key1 | key2 | uploadTimestamp |
---|---|---|
0005541779 | 10 | 2021-12-29 14:54:08.753 |
0005541779 | 10 | 2021-12-29 15:06:05.968 |
排序和剔除重复代码:
#############################################################
tempDF = S3bucket_node1.toDF() #from Dynamic Frame to Data Frame
sortedDF = tempDF.orderBy(f.col("uploadTimestamp").desc(),"key1","key2").dropDuplicates(["key1","key2"]) #sort and remove duplicates
dynamicFrame = DynamicFrame.fromDF(sortedDF, glueContext, 'salesOrder') #back to Dynamic Frame
#############################################################
通过以下方式下单后查看此图片:
我的问题: 在输出文件中,一些数据获得了最后一个时间戳,一些数据获得了第一个时间戳。我不明白为什么它不适用于所有数据。
谢谢。
它适用于以下代码:
tempDF = S3bucket_node1.toDF()
w = Window.partitionBy("key1","key2").orderBy(f.desc("uploadTimestamp"))
df = tempDF.withColumn("rn", f.row_number().over(w)).filter("rn = 1").drop("rn")
dynamicFrame = DynamicFrame.fromDF(df, glueContext, 'dynamicFrame')
解决该问题的技巧可在此处找到: