如何为原始区域内的所有表执行 Glue ETL 作业(从我的原始区域转换为 parquet 到已处理)?

How can I execute a Glue ETL job (conversion to parquet from my raw zone to processed) for all tables within the raw zone?

我目前正在自动化我的数据湖摄取过程。我有数据进入我的原始区域(S3 存储桶)。在存储桶中,我有 27 个文件夹,每个文件夹对应一个数据库 - 每个文件夹都有 x 个 csv 文件,每个文件对应一个 table。我有一个 S3 事件(所有对象创建事件)触发一个 lambda 函数来爬取我的原始区域。我能够成功地看到每个 table。完成后,我想创建一个 ETL 作业,将处理区域中的数据移动到镶木地板,但是考虑到我拥有的 tables 的数量,我不想手动创建一个作业来指定每个table 作为 "source"。

我通过将单个 csv 文件上传到我的原始区域和爬虫 运行 来演示我的自动化服务,然后 ETL 作业也 运行 将 "s3 raw zone table" 转换为镶木地板并着陆它进入我的处理区域。当我放下第二个 table 时,爬虫能够在我的原始区域中成功地将其识别为新的 table,但在我的处理区域中,它将数据合并到第一个模式(即使它们是完全不同)。

我希望得到以下结果: 1) 爬虫将 csv 识别为 table 2)胶水etl将文件转换为镶木地板 3) 爬虫将 parquet 文件识别为单个 table

以下代码突出显示了我面临的问题 - 指定的数据源是一个 table(文件夹),并且假定该文件夹中的所有内容都具有相同的架构。

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "APPLICATION_XYZ", table_name = "RAW_ZONE_w1cqzldd5jpe", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("lpep_pickup_datetime", "string", "lpep_pickup_datetime", "string"), ("lpep_dropoff_datetime", "string", "lpep_dropoff_datetime", "string"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("ehail_fee", "string", "ehail_fee", "string"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "long"), ("trip_type", "long", "trip_type", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]

创建了一个具有以下功能的 ETL 作业,循环遍历我数据库中的 table 并将镶木地板文件写入具有相同名称的新文件夹(这样我就可以抓取 table并使用athena查询)。

databaseName = 'DATABASE'
Tables = client.get_tables( DatabaseName = databaseName )
tableList = Tables ['TableList']
for table in tableList:
    tableName = table['Name']
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "DATABASE", table_name = tableName, transformation_ctx = "datasource0")
    datasink4 = glueContext.write_dynamic_frame.from_options(frame = datasource0, connection_type = "s3", connection_options = {"path": "s3://processed-45ah4xoyqr1b/Application1/"+tableName+"/"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()