AWS Glue 作业书签为 csv 文件生成重复项
AWS Glue job bookmark produces duplicates for csv files
我们每天上午 11 点在 s3 存储桶中从供应商处收到 1 个 csv 文件。
我在 11:30am.
使用 Glue 将此文件转换为 parquet 格式
我已启用作业书签以不处理已处理的文件。
尽管如此,我还是看到一些文件正在被重新处理,因此产生了重复。
我阅读了这些问题和答案 and AWS Glue Job Bookmarking explanation
他们很好地理解了工作书签,但仍然没有解决这个问题。
AWS 文档说,它支持 CSV 文件作为书签 AWS documentation。
想知道是否有人可以帮助我了解可能是什么问题以及可能的解决方案:)
编辑:
根据 Prabhakar 的要求在此处粘贴示例代码。
staging_database_name = "my-glue-db"
s3_target_path = "s3://mybucket/mydata/"
"""
'date_index': date location in the file name
'date_only': only date column is inserted
'date_format': format of date
'path': sub folder name in master bucket
"""
#fouo classified files
tables_spec = {
'sample_table': {'path': 'sample_table/load_date=','pkey': 'mykey', 'orderkey':'myorderkey'}
}
spark_conf = SparkConf().setAll([
("spark.hadoop.fs.s3.enableServerSideEncryption", "true"),
("spark.hadoop.fs.s3.serverSideEncryption.kms.keyId", kms_key_id)
])
sc = SparkContext(conf=spark_conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
for table_name, spec in tables_spec.items():
datasource0 = glueContext.create_dynamic_frame.from_catalog(database=database_name,
table_name=table_name,
transformation_ctx='datasource0')
resolvechoice2 = ResolveChoice.apply(frame=datasource0, choice="make_struct", transformation_ctx='resolvechoice2')
# Create spark data frame with input_file_name column
delta_df = resolvechoice2.toDF().withColumn('ingest_datetime', lit(str(ingest_datetime)))
date_dyf = DynamicFrame.fromDF(delta_df, glueContext, "date_dyf")
master_folder_path1 = os.path.join(s3_target_path, spec['path']).replace('\', '/')
master_folder_path=master_folder_path1+load_date
datasink4 = glueContext.write_dynamic_frame.from_options(frame=date_dyf,
connection_type='s3',
connection_options={"path": master_folder_path},
format='parquet', transformation_ctx='datasink4')
job.commit()
与 AWS Support 工程师交谈,她提到,她能够重现该问题,并已将其提交给 Glue 技术团队寻求解决方案。
尽管如此,我等不及他们修复错误并采取了不同的方法。
解决方案:
- 禁用 Glue 书签
- Glue 作业将 csv 文件转换为 Parquet 后,我
将 csv 文件移动到 S3 存储桶中的不同位置。
我们每天上午 11 点在 s3 存储桶中从供应商处收到 1 个 csv 文件。 我在 11:30am.
使用 Glue 将此文件转换为 parquet 格式我已启用作业书签以不处理已处理的文件。 尽管如此,我还是看到一些文件正在被重新处理,因此产生了重复。
我阅读了这些问题和答案
他们很好地理解了工作书签,但仍然没有解决这个问题。
AWS 文档说,它支持 CSV 文件作为书签 AWS documentation。
想知道是否有人可以帮助我了解可能是什么问题以及可能的解决方案:)
编辑:
根据 Prabhakar 的要求在此处粘贴示例代码。
staging_database_name = "my-glue-db"
s3_target_path = "s3://mybucket/mydata/"
"""
'date_index': date location in the file name
'date_only': only date column is inserted
'date_format': format of date
'path': sub folder name in master bucket
"""
#fouo classified files
tables_spec = {
'sample_table': {'path': 'sample_table/load_date=','pkey': 'mykey', 'orderkey':'myorderkey'}
}
spark_conf = SparkConf().setAll([
("spark.hadoop.fs.s3.enableServerSideEncryption", "true"),
("spark.hadoop.fs.s3.serverSideEncryption.kms.keyId", kms_key_id)
])
sc = SparkContext(conf=spark_conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
for table_name, spec in tables_spec.items():
datasource0 = glueContext.create_dynamic_frame.from_catalog(database=database_name,
table_name=table_name,
transformation_ctx='datasource0')
resolvechoice2 = ResolveChoice.apply(frame=datasource0, choice="make_struct", transformation_ctx='resolvechoice2')
# Create spark data frame with input_file_name column
delta_df = resolvechoice2.toDF().withColumn('ingest_datetime', lit(str(ingest_datetime)))
date_dyf = DynamicFrame.fromDF(delta_df, glueContext, "date_dyf")
master_folder_path1 = os.path.join(s3_target_path, spec['path']).replace('\', '/')
master_folder_path=master_folder_path1+load_date
datasink4 = glueContext.write_dynamic_frame.from_options(frame=date_dyf,
connection_type='s3',
connection_options={"path": master_folder_path},
format='parquet', transformation_ctx='datasink4')
job.commit()
与 AWS Support 工程师交谈,她提到,她能够重现该问题,并已将其提交给 Glue 技术团队寻求解决方案。
尽管如此,我等不及他们修复错误并采取了不同的方法。
解决方案:
- 禁用 Glue 书签
- Glue 作业将 csv 文件转换为 Parquet 后,我 将 csv 文件移动到 S3 存储桶中的不同位置。