aws glue job如何在redshift中上传多个表

how can aws glue job upload several tables in redshift

是否可以使用 AWS Glue 作业在 Redshift 中加载多个 table?

这些是我遵循的步骤。

  1. 已从 S3 抓取 json,数据已转换为数据目录 table。
  2. 我创建了一个将在 redshift 中上传数据目录 table 的作业,但它只限制我为每个作业上传 1 个 table。在作业属性(添加作业)中,我选择的 This job runs 选项是:A proposed script generated by AWS Glue.

我不熟悉 python 并且我是 AWS Glue 的新手。但我有几个 table 需要上传。

这是一个示例脚本:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "sampledb", table_name = "abs", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("value", "int", "value", "int"), ("sex", "string", "sex", "string"), ("age", "string", "age", "string"), ("highest year of school completed", "string", "highest year of school completed", "string"), ("state", "string", "state", "string"), ("region type", "string", "region type", "string"), ("lga 2011", "string", "lga 2011", "string"), ("frequency", "string", "frequency", "string"), ("time", "string", "time", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("value", "int", "value", "int"), ("sex", "string", "sex", "string"), ("age", "string", "age", "string"), ("highest year of school completed", "string", "highest year of school completed", "string"), ("state", "string", "state", "string"), ("region type", "string", "region type", "string"), ("lga 2011", "string", "lga 2011", "string"), ("frequency", "string", "frequency", "string"), ("time", "string", "time", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "redshift", connection_options = {"dbtable": "abs", "database": "dbmla"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()

aws 胶水数据库:sampledb
table aws glue 中的名称:abs
红移数据库:dbmla

请提供有关如何上传它们的示例。谢谢!

根据 AWS Glue FAQ,您可以修改生成的代码,以及 运行 作业。

Q: How can I customize the ETL code generated by AWS Glue?

AWS Glue’s ETL script recommendation system generates Scala or Python code. It leverages Glue’s custom ETL library to simplify access to data sources as well as manage job execution. You can find more details about the library in our documentation. You can write ETL code using AWS Glue’s custom library or write arbitrary code in Scala or Python by using inline editing via the AWS Glue Console script editor, downloading the auto-generated code, and editing it in your own IDE. You can also start with one of the many samples hosted in our Github repository and customize that code.

因此,请尝试将其他表格的代码片段添加到同一脚本中,如下所示,

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs2", transformation_ctx = "datasource1")
applymapping2 = ApplyMapping.apply(.. transformation_ctx = "applymapping2")
resolvechoice2 = ResolveChoice.apply(frame = applymapping2, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs2", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs2", transformation_ctx = "datasource1")
applymapping2 = ApplyMapping.apply(.. transformation_ctx = "applymapping2")
resolvechoice2 = ResolveChoice.apply(frame = applymapping2, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs2", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

datasource3 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs2", transformation_ctx = "datasource1")
applymapping2 = ApplyMapping.apply(.. transformation_ctx = "applymapping2")
resolvechoice2 = ResolveChoice.apply(frame = applymapping2, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift", connection_options = {"dbtable": "abs2", "database": "dbmla"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

job.commit()

相应地更改变量名称以使其唯一。谢谢