AWS Glue 数据从 S3 迁移到 Redshift
AWS Glue Data moving from S3 to Redshift
我在一个 S3 存储桶中有大约 70 个表,我想使用胶水将它们移动到红移。我只能移动几张桌子。其余的有数据类型问题。 Redshift 不接受某些数据类型。我在一组逐个移动表格的代码中解决了这个问题:
table1 = glueContext.create_dynamic_frame.from_catalog(
database="db1_g", table_name="table1"
)
table1 = table1.resolveChoice(
specs=[
("column1", "cast:char"),
("column2", "cast:varchar"),
("column3", "cast:varchar"),
]
)
table1 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=table1,
catalog_connection="redshift",
connection_options={"dbtable": "schema1.table1", "database": "db1"},
redshift_tmp_dir=args["TempDir"],
transformation_ctx="table1",
)
相同的脚本用于具有数据类型更改问题的所有其他表。
但是,因为我想自动化脚本,所以我使用了循环表脚本,它遍历所有表并将它们写入 redshift。我有 2 个与此脚本相关的问题。
- 无法将表移动到 redshift 中的相应模式。
- 无法在循环脚本中为那些需要更改数据类型的表添加 if 条件。
client = boto3.client("glue", region_name="us-east-1")
databaseName = "db1_g"
Tables = client.get_tables(DatabaseName=databaseName)
tableList = Tables["TableList"]
for table in tableList:
tableName = table["Name"]
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database="db1_g", table_name=tableName, transformation_ctx="datasource0"
)
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=datasource0,
catalog_connection="redshift",
connection_options={
"dbtable": tableName,
"database": "schema1.db1",
},
redshift_tmp_dir=args["TempDir"],
transformation_ctx="datasink4",
)
job.commit()
像这样将 redshift 模式名称与 tableName
一起提及:schema1.tableName
抛出错误 schema1 is not defined
.
任何人都可以在循环脚本本身内帮助更改所有需要相同数据类型的表吗?
所以第一个问题很容易解决。架构属于 dbtable
属性而不是 database
,如下所示:
connection_options={
"dbtable": f"schema1.{tableName},
"database": "db1",
}
你的第二个问题是你想在 for 循环中调用 resolveChoice
,对吗?那里发生什么样的错误?为什么不起作用?
我在一个 S3 存储桶中有大约 70 个表,我想使用胶水将它们移动到红移。我只能移动几张桌子。其余的有数据类型问题。 Redshift 不接受某些数据类型。我在一组逐个移动表格的代码中解决了这个问题:
table1 = glueContext.create_dynamic_frame.from_catalog(
database="db1_g", table_name="table1"
)
table1 = table1.resolveChoice(
specs=[
("column1", "cast:char"),
("column2", "cast:varchar"),
("column3", "cast:varchar"),
]
)
table1 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=table1,
catalog_connection="redshift",
connection_options={"dbtable": "schema1.table1", "database": "db1"},
redshift_tmp_dir=args["TempDir"],
transformation_ctx="table1",
)
相同的脚本用于具有数据类型更改问题的所有其他表。 但是,因为我想自动化脚本,所以我使用了循环表脚本,它遍历所有表并将它们写入 redshift。我有 2 个与此脚本相关的问题。
- 无法将表移动到 redshift 中的相应模式。
- 无法在循环脚本中为那些需要更改数据类型的表添加 if 条件。
client = boto3.client("glue", region_name="us-east-1")
databaseName = "db1_g"
Tables = client.get_tables(DatabaseName=databaseName)
tableList = Tables["TableList"]
for table in tableList:
tableName = table["Name"]
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database="db1_g", table_name=tableName, transformation_ctx="datasource0"
)
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=datasource0,
catalog_connection="redshift",
connection_options={
"dbtable": tableName,
"database": "schema1.db1",
},
redshift_tmp_dir=args["TempDir"],
transformation_ctx="datasink4",
)
job.commit()
像这样将 redshift 模式名称与 tableName
一起提及:schema1.tableName
抛出错误 schema1 is not defined
.
任何人都可以在循环脚本本身内帮助更改所有需要相同数据类型的表吗?
所以第一个问题很容易解决。架构属于 dbtable
属性而不是 database
,如下所示:
connection_options={
"dbtable": f"schema1.{tableName},
"database": "db1",
}
你的第二个问题是你想在 for 循环中调用 resolveChoice
,对吗?那里发生什么样的错误?为什么不起作用?