在 Glue 数据目录中为 S3 和未知架构中的数据创建表
Create tables in Glue Data Catalog for data in S3 and unknown schema
我当前的用例是,在基于 ETL 的服务中(NOTE
:ETL 服务不使用 Glue ETL,它是一项独立服务),我从 AWS Redshift 集群获取一些数据到S3。然后将 S3 中的数据馈送到 T 和 L 作业中。我想将元数据填充到 Glue 目录中。最基本的解决方案是使用 Glue Crawler,但爬虫 运行s 大约需要 1 小时 20 分钟(很多 s3 分区)。我遇到的另一个解决方案是使用 Glue API's。但是,我同样面临着数据类型定义的问题。
有什么办法,我可以 create/update 我在 S3 中有数据的 Glue 目录表,数据类型仅在提取过程中已知。
而且,当 T 和 L 作业正在 运行 时,数据类型应该在目录中随时可用。
为了在 ETL 过程中创建、更新数据目录,您可以使用以下方法:
更新:
additionalOptions = {"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"}
additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"]
sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>,
table_name=<dst_tbl_name>, transformation_ctx="write_sink",
additional_options=additionalOptions)
job.commit()
以上可用于更新架构。您还可以选择在 LOG
或 UPDATE_IN_DATABASE
(default) 之间选择设置 updateBehavior
。
创建
要在 ETL 期间在数据目录中创建新的 table,您可以按照以下示例操作:
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data",
enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["partition_key0", "partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>)
sink.writeFrame(last_transform)
您可以使用 setCatalogInfo
指定数据库和新的 table 名称。
您还可以选择使用 enableUpdateCatalog
参数 更新数据目录中的分区 ,然后指定 partitionKeys
.
可以找到有关该功能的更详细说明 here。
找到问题的解决方案,我最终利用 Glue Catalog API 使其无缝且快速。
我创建了一个与 Glue Catalog 交互的接口,并为各种数据源覆盖了这些方法。数据加载到 S3 后,我立即启动查询以从源中获取架构,然后界面开始工作。
我当前的用例是,在基于 ETL 的服务中(NOTE
:ETL 服务不使用 Glue ETL,它是一项独立服务),我从 AWS Redshift 集群获取一些数据到S3。然后将 S3 中的数据馈送到 T 和 L 作业中。我想将元数据填充到 Glue 目录中。最基本的解决方案是使用 Glue Crawler,但爬虫 运行s 大约需要 1 小时 20 分钟(很多 s3 分区)。我遇到的另一个解决方案是使用 Glue API's。但是,我同样面临着数据类型定义的问题。
有什么办法,我可以 create/update 我在 S3 中有数据的 Glue 目录表,数据类型仅在提取过程中已知。
而且,当 T 和 L 作业正在 运行 时,数据类型应该在目录中随时可用。
为了在 ETL 过程中创建、更新数据目录,您可以使用以下方法:
更新:
additionalOptions = {"enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"}
additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"]
sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>,
table_name=<dst_tbl_name>, transformation_ctx="write_sink",
additional_options=additionalOptions)
job.commit()
以上可用于更新架构。您还可以选择在 LOG
或 UPDATE_IN_DATABASE
(default) 之间选择设置 updateBehavior
。
创建
要在 ETL 期间在数据目录中创建新的 table,您可以按照以下示例操作:
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data",
enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["partition_key0", "partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>)
sink.writeFrame(last_transform)
您可以使用 setCatalogInfo
指定数据库和新的 table 名称。
您还可以选择使用 enableUpdateCatalog
参数 更新数据目录中的分区 ,然后指定 partitionKeys
.
可以找到有关该功能的更详细说明 here。
找到问题的解决方案,我最终利用 Glue Catalog API 使其无缝且快速。 我创建了一个与 Glue Catalog 交互的接口,并为各种数据源覆盖了这些方法。数据加载到 S3 后,我立即启动查询以从源中获取架构,然后界面开始工作。