将 CSV 读入 AWS Glue 并加入数据目录中的数据
Read CSV into AWS Glue and join with data from Data Catalogue
我对 AWS Glue 还很陌生,想了解如何执行以下操作:
- 从 AWS Glue
中的 URL 中提取 CSV 文件
- 将数据集与我在数据目录中的 table 中的列相结合。
- 将其作为新 table 写回数据目录。
到目前为止我有这个:
tableA_DF = pandas.read_cv("https://example.com/file.csv")
tableB = glueContext.create_dynamic_frame.from_catalog(database=Z, table_name = Y)
tableB_DF = TableB.toDF()
但是,我不确定如何加入两者。想简单地从 tableB 添加一列到 tableA,然后将结果存储在数据目录中。
您可以使用 create_dynamic_frame_from_options
从 S3 直接读入 Dynamicframe。
Lookupdata = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths":[InputLookupDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'})
然后使用如下所示从目录中读取数据:
datasource0 = GlueContext.create_dynamic_frame.from_catalog(database = Glue_catalog_database, table_name = Glue_table_name, transformation_ctx = "datasource0")
然后您可以使用 join 连接两个框架,然后将其写回目录。
[joined_dyf][1] = Join.apply(Lookupdata,Join.apply(datasource0, Lookupdata,'someidfrom_datasource0','someidfrom_Lookupdata')
现在将此写回到目录中作为 table 参考 this 了解更多信息:
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data",
enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["partition_key0", "partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>)
sink.writeFrame(last_transform)
我对 AWS Glue 还很陌生,想了解如何执行以下操作:
- 从 AWS Glue 中的 URL 中提取 CSV 文件
- 将数据集与我在数据目录中的 table 中的列相结合。
- 将其作为新 table 写回数据目录。
到目前为止我有这个:
tableA_DF = pandas.read_cv("https://example.com/file.csv")
tableB = glueContext.create_dynamic_frame.from_catalog(database=Z, table_name = Y)
tableB_DF = TableB.toDF()
但是,我不确定如何加入两者。想简单地从 tableB 添加一列到 tableA,然后将结果存储在数据目录中。
您可以使用 create_dynamic_frame_from_options
从 S3 直接读入 Dynamicframe。
Lookupdata = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths":[InputLookupDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'})
然后使用如下所示从目录中读取数据:
datasource0 = GlueContext.create_dynamic_frame.from_catalog(database = Glue_catalog_database, table_name = Glue_table_name, transformation_ctx = "datasource0")
然后您可以使用 join 连接两个框架,然后将其写回目录。
[joined_dyf][1] = Join.apply(Lookupdata,Join.apply(datasource0, Lookupdata,'someidfrom_datasource0','someidfrom_Lookupdata')
现在将此写回到目录中作为 table 参考 this 了解更多信息:
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data",
enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=["partition_key0", "partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>)
sink.writeFrame(last_transform)