将 CSV 读入 AWS Glue 并加入数据目录中的数据

Read CSV into AWS Glue and join with data from Data Catalogue

我对 AWS Glue 还很陌生,想了解如何执行以下操作:

  1. 从 AWS Glue
  2. 中的 URL 中提取 CSV 文件
  3. 将数据集与我在数据目录中的 table 中的列相结合。
  4. 将其作为新 table 写回数据目录。

到目前为止我有这个:

  tableA_DF = pandas.read_cv("https://example.com/file.csv")
  tableB = glueContext.create_dynamic_frame.from_catalog(database=Z, table_name = Y)
  tableB_DF = TableB.toDF()

但是,我不确定如何加入两者。想简单地从 tableB 添加一列到 tableA,然后将结果存储在数据目录中。

您可以使用 create_dynamic_frame_from_options 从 S3 直接读入 Dynamicframe。

Lookupdata  = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths":[InputLookupDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'})

然后使用如下所示从目录中读取数据:

datasource0 = GlueContext.create_dynamic_frame.from_catalog(database = Glue_catalog_database, table_name = Glue_table_name, transformation_ctx = "datasource0")

然后您可以使用 join 连接两个框架,然后将其写回目录。

[joined_dyf][1] = Join.apply(Lookupdata,Join.apply(datasource0, Lookupdata,'someidfrom_datasource0','someidfrom_Lookupdata')

现在将此写回到目录中作为 table 参考 this 了解更多信息:

sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data",
                           enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
                           partitionKeys=["partition_key0", "partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>)
sink.writeFrame(last_transform)