AWS Glue 完成后,如何执行一个SQL脚本或存储过程?
After AWS Glue is finished, how to execute a SQL script or stored procedure?
我正在学习 AWS Glue。对于传统的 ETL,一个常见的模式是从目标 table 中查找主键,以决定是否需要进行更新或插入(也称为更新插入设计模式)。使用胶水似乎没有相同的控件。简单写出动态帧只是一个插入过程。我可以想到两种设计模式来解决这个问题:
- 将目标加载为数据框并在 spark 中,左外连接仅插入新行(如果需要,您将如何更新行?删除然后插入???因为我是 spark 的新手,这是最对我来说很陌生)
- 将数据加载到阶段table,然后使用SQL执行最后的合并
我首先探索的是第二种方法。 AWS Glue 作业完成后,我如何在 AWS 世界中执行 SQL 脚本或存储过程?你做 python-shell 工作,lambda,直接作为胶水的一部分,还是其他方式?
我使用 pymysql 库作为 zip 文件上传到 AWS S3,并在 AWS Glue 作业参数中配置。对于 UPSERT,我使用了 INSERT INTO TABLE.....ON DUPLICATE KEY.
因此,基于主键验证,代码将更新记录(如果已存在)或插入新记录。希望这可以帮助。请参考:
import pymysql
rds_host = "rds.url.aaa.us-west-2.rds.amazonaws.com"
name = "username"
password = "userpwd"
db_name = "dbname"
conn = pymysql.connect(host=rds_host, user=name, passwd=password, db=db_name, connect_timeout=5)
with conn.cursor() as cur:
insertQry="INSERT INTO ZIP_TERR(zip_code, territory_code, "
"territory_name,state) "
"VALUES(zip_code, territory_code, territory_name, state) "
"ON DUPLICATE KEY UPDATE territory_name = "
"VALUES(territory_name), state = VALUES(state);"
cur.execute(insertQry)
conn.commit()
cur.close()
在上面的代码示例中,territory-code, zip-code 是主键。也请参考这里:
一如既往,AWS 不断变化的功能列表解决了其中的大部分问题(由用户需求和常见工作模式引起)。
AWS 已发布有关 Updating and Inserting new data 的文档,使用暂存表(您在第二个策略中提到)。
一般来说,ETL 最严格的方法是截断并重新加载源数据,但这取决于您的源数据。如果您的源数据是跨越数十亿条记录的时间序列数据集,您可能需要使用 delta/incremental 加载模式。
我正在学习 AWS Glue。对于传统的 ETL,一个常见的模式是从目标 table 中查找主键,以决定是否需要进行更新或插入(也称为更新插入设计模式)。使用胶水似乎没有相同的控件。简单写出动态帧只是一个插入过程。我可以想到两种设计模式来解决这个问题:
- 将目标加载为数据框并在 spark 中,左外连接仅插入新行(如果需要,您将如何更新行?删除然后插入???因为我是 spark 的新手,这是最对我来说很陌生)
- 将数据加载到阶段table,然后使用SQL执行最后的合并
我首先探索的是第二种方法。 AWS Glue 作业完成后,我如何在 AWS 世界中执行 SQL 脚本或存储过程?你做 python-shell 工作,lambda,直接作为胶水的一部分,还是其他方式?
我使用 pymysql 库作为 zip 文件上传到 AWS S3,并在 AWS Glue 作业参数中配置。对于 UPSERT,我使用了 INSERT INTO TABLE.....ON DUPLICATE KEY.
因此,基于主键验证,代码将更新记录(如果已存在)或插入新记录。希望这可以帮助。请参考:
import pymysql
rds_host = "rds.url.aaa.us-west-2.rds.amazonaws.com"
name = "username"
password = "userpwd"
db_name = "dbname"
conn = pymysql.connect(host=rds_host, user=name, passwd=password, db=db_name, connect_timeout=5)
with conn.cursor() as cur:
insertQry="INSERT INTO ZIP_TERR(zip_code, territory_code, "
"territory_name,state) "
"VALUES(zip_code, territory_code, territory_name, state) "
"ON DUPLICATE KEY UPDATE territory_name = "
"VALUES(territory_name), state = VALUES(state);"
cur.execute(insertQry)
conn.commit()
cur.close()
在上面的代码示例中,territory-code, zip-code 是主键。也请参考这里:
一如既往,AWS 不断变化的功能列表解决了其中的大部分问题(由用户需求和常见工作模式引起)。
AWS 已发布有关 Updating and Inserting new data 的文档,使用暂存表(您在第二个策略中提到)。
一般来说,ETL 最严格的方法是截断并重新加载源数据,但这取决于您的源数据。如果您的源数据是跨越数十亿条记录的时间序列数据集,您可能需要使用 delta/incremental 加载模式。