AWS Glue Crawl 动态 S3 路径位置
AWS Glue Crawl Dynamic S3 Path Location
我正在 AWS Glue 中创建一个 ETL 作业,该作业将从 S3 位置提取存储库中每个实体的最新编辑或当前数据。存储库中的数据是实体所有编辑的历史记录。每天我 运行 ETL 并将其写入另一个 S3 位置,即 Bucket/path/to/files/current_date/... 当前日期是动态的并且与 ETL 运行 的日期一致。
我遇到的问题 运行 是我无法以编程方式从 S3 中删除(组织限制),或者移动文件,因为那是在幕后复制和删除,所以它也失败了,留下了胶水爬行的单一路径。我想设置爬虫,使路径的日期部分是动态的,但我无法找到一种方法来做到这一点——有人知道这是否可行吗?
我的数据按 run_date(请参阅上面的当前日期)以及其他 6 个分层分区进行分区。我正在通过 CloudFormation、yaml 语言创建爬虫和 ETL 作业。爬虫的路径存储为 CloudFormation 脚本中定义的 ssm 参数。
路径 SSM 参数示例
S3CurrentPath:
Type: AWS::SSM::Parameter
Properties:
Description: "Path in the S3 Lake where the current entity data is stored."
Type: String
Value: 'Data/Entities/Software/SoftwareCurrent'
Name: "/org/member/local/s3/path/entityCurrent"
爬虫资源代码:
GenericCrawler:
Type: AWS::Glue::Crawler
Properties:
Role: !Ref RoleNAme
Name: !Sub "${ProfileName}-crawler-${CrawlerName}"
Configuration: !Sub |
{
"Version": 1.0,
"CrawlerOutput": {
"Partitions": { "AddOrUpdateBehavior": "InheritFromTable" },
"Tables": { "AddOrUpdateBehavior": "MergeNewColumns" }
}
}
Description: !Ref CrawlerDescription
DatabaseName: !Ref DatabaseName
Targets:
S3Targets:
- Path: !Sub "s3://${S3DataBucket}/${S3Path}"
ETL DataSink 写入代码:
# Write the joined dynamic frame out to a datasink
datasink = glueContext.write_dynamic_frame.from_options(
frame = final_dynamic_frame, connection_type = "s3",
connection_options = {
'path': 's3://{lakeBucketName}/{lakePath}/'.format(
lakeBucketName=args['lakeBucketName'],
lakePath=args['lakeDestinationPath']),
"partitionKeys": ['run_date','location','year','month','day','hour','timestamp']},
format = "parquet",
transformation_ctx = "datasink")
我希望爬虫会查看存储库中的最新日期,即最近的 run_date 分区 "folder" 并在不回顾旧数据的情况下对其进行爬网。
如果您想查看更多代码,请告诉我——我很乐意整理并提供。
老实说,我还没有找到使用 AWS Glue read/write 数据到动态路径的方法。我通常做的是 read/write 使用 PySpark 方法:
datasink.write.\
format("com.databricks.spark.csv").\
option("header", "true").\
mode("overwrite").\
save("s3://my-bucket/files/" + current_date + "*.csv")
您甚至可以告诉该方法只 read/write 特定类型的文件(例如 .csv)。
PySpark 比 AWS Glue 有更多的选项和可用的方法,因此具有更大的灵活性。此外,我在 DynamoDB table 中添加了一条 key/value 记录以记录最新日期。
我正在 AWS Glue 中创建一个 ETL 作业,该作业将从 S3 位置提取存储库中每个实体的最新编辑或当前数据。存储库中的数据是实体所有编辑的历史记录。每天我 运行 ETL 并将其写入另一个 S3 位置,即 Bucket/path/to/files/current_date/... 当前日期是动态的并且与 ETL 运行 的日期一致。
我遇到的问题 运行 是我无法以编程方式从 S3 中删除(组织限制),或者移动文件,因为那是在幕后复制和删除,所以它也失败了,留下了胶水爬行的单一路径。我想设置爬虫,使路径的日期部分是动态的,但我无法找到一种方法来做到这一点——有人知道这是否可行吗?
我的数据按 run_date(请参阅上面的当前日期)以及其他 6 个分层分区进行分区。我正在通过 CloudFormation、yaml 语言创建爬虫和 ETL 作业。爬虫的路径存储为 CloudFormation 脚本中定义的 ssm 参数。
路径 SSM 参数示例
S3CurrentPath:
Type: AWS::SSM::Parameter
Properties:
Description: "Path in the S3 Lake where the current entity data is stored."
Type: String
Value: 'Data/Entities/Software/SoftwareCurrent'
Name: "/org/member/local/s3/path/entityCurrent"
爬虫资源代码:
GenericCrawler:
Type: AWS::Glue::Crawler
Properties:
Role: !Ref RoleNAme
Name: !Sub "${ProfileName}-crawler-${CrawlerName}"
Configuration: !Sub |
{
"Version": 1.0,
"CrawlerOutput": {
"Partitions": { "AddOrUpdateBehavior": "InheritFromTable" },
"Tables": { "AddOrUpdateBehavior": "MergeNewColumns" }
}
}
Description: !Ref CrawlerDescription
DatabaseName: !Ref DatabaseName
Targets:
S3Targets:
- Path: !Sub "s3://${S3DataBucket}/${S3Path}"
ETL DataSink 写入代码:
# Write the joined dynamic frame out to a datasink
datasink = glueContext.write_dynamic_frame.from_options(
frame = final_dynamic_frame, connection_type = "s3",
connection_options = {
'path': 's3://{lakeBucketName}/{lakePath}/'.format(
lakeBucketName=args['lakeBucketName'],
lakePath=args['lakeDestinationPath']),
"partitionKeys": ['run_date','location','year','month','day','hour','timestamp']},
format = "parquet",
transformation_ctx = "datasink")
我希望爬虫会查看存储库中的最新日期,即最近的 run_date 分区 "folder" 并在不回顾旧数据的情况下对其进行爬网。
如果您想查看更多代码,请告诉我——我很乐意整理并提供。
老实说,我还没有找到使用 AWS Glue read/write 数据到动态路径的方法。我通常做的是 read/write 使用 PySpark 方法:
datasink.write.\
format("com.databricks.spark.csv").\
option("header", "true").\
mode("overwrite").\
save("s3://my-bucket/files/" + current_date + "*.csv")
您甚至可以告诉该方法只 read/write 特定类型的文件(例如 .csv)。 PySpark 比 AWS Glue 有更多的选项和可用的方法,因此具有更大的灵活性。此外,我在 DynamoDB table 中添加了一条 key/value 记录以记录最新日期。