如何使用 AWS Glue 和其他 AWS 服务从 Oracle 数据库中提取数据
How to extract data from Oracle database with AWS Glue and other AWS services
我不熟悉 AWS glue 和其他 AWS 东西。我有一个为项目构建ETL框架的需求。
这是 high-level 图。我想了解,不是创建 400 个胶水管道,我可以创建一个模板类型的东西,它是由来自 postgres aurora/mysql 的参考数据驱动的。我熟悉Python。
有人对此有任何想法吗?任何参考、代码示例。
- 我们的 mysql 数据库中有一个配置主机 table。为了方便起见,我们使用 source_table_name 作为标识符来获取适当的 table 列 names/queries 用于创建 STG TABLE、将数据加载到 STG TABLE、INSERT/UPDATE 进入目标 TABLEs 等
- 我们还将 INSERT/UPDATE 拆分为主配置文件中的两个不同列,因为我们使用 ON DUPLICATE KEY 更新现有记录。
- 通过处理将具有登陆文件名的 lambda 事件,获取源 table 名称。
- 从配置主机获取源 table 名称所需的所有数据。它将类似于以下内容:
sql_query = "SELECT * FROM {0}.CONFIG_MASTER WHERE src_tbl_name = %s ".format(mydb)
cur.execute(sql_query, (source_fname))
result = cur.fetchall()
for row in result:
stg_table_name = row[1]
tgt_table_name = row[2]
create_stg_table_qry = row[3]
load_data_stg_table_qry = row[4]
insert_tgt_table_qry = row[5]
insert_tgt_table_qry_part_1 = row[6]
insert_tgt_table_qry_part_2 = row[7]
conn.commit()
cur.close()
将适当的参数传递给通用函数,如下所示:
create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry)
loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name)
通用函数如下所示,这是针对极光RDS的,请根据需要进行更改。
def create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry):
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS {0}.{1}".format(mydb, stg_table_name)
createStgTable2 = "CREATE TABLE {0}.{1} {2}".format(mydb, stg_table_name, create_stg_table_qry)
loadQry = "LOAD DATA FROM S3 PREFIX 's3://' REPLACE INTO TABLE ...".format()
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()
def loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name):
cur, conn = connect()
insertQry = "INSERT INTO target table, from the staging table query here"
print(insertQry)
cur.execute(insertQry)
conn.commit()
conn.close()
希望这能提供一个想法。
谢谢
我不熟悉 AWS glue 和其他 AWS 东西。我有一个为项目构建ETL框架的需求。 这是 high-level 图。我想了解,不是创建 400 个胶水管道,我可以创建一个模板类型的东西,它是由来自 postgres aurora/mysql 的参考数据驱动的。我熟悉Python。 有人对此有任何想法吗?任何参考、代码示例。
- 我们的 mysql 数据库中有一个配置主机 table。为了方便起见,我们使用 source_table_name 作为标识符来获取适当的 table 列 names/queries 用于创建 STG TABLE、将数据加载到 STG TABLE、INSERT/UPDATE 进入目标 TABLEs 等
- 我们还将 INSERT/UPDATE 拆分为主配置文件中的两个不同列,因为我们使用 ON DUPLICATE KEY 更新现有记录。
- 通过处理将具有登陆文件名的 lambda 事件,获取源 table 名称。
- 从配置主机获取源 table 名称所需的所有数据。它将类似于以下内容:
sql_query = "SELECT * FROM {0}.CONFIG_MASTER WHERE src_tbl_name = %s ".format(mydb) cur.execute(sql_query, (source_fname)) result = cur.fetchall() for row in result: stg_table_name = row[1] tgt_table_name = row[2] create_stg_table_qry = row[3] load_data_stg_table_qry = row[4] insert_tgt_table_qry = row[5] insert_tgt_table_qry_part_1 = row[6] insert_tgt_table_qry_part_2 = row[7] conn.commit() cur.close()
将适当的参数传递给通用函数,如下所示:
create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry)
loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name)
通用函数如下所示,这是针对极光RDS的,请根据需要进行更改。
def create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry):
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS {0}.{1}".format(mydb, stg_table_name)
createStgTable2 = "CREATE TABLE {0}.{1} {2}".format(mydb, stg_table_name, create_stg_table_qry)
loadQry = "LOAD DATA FROM S3 PREFIX 's3://' REPLACE INTO TABLE ...".format()
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()
def loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name):
cur, conn = connect()
insertQry = "INSERT INTO target table, from the staging table query here"
print(insertQry)
cur.execute(insertQry)
conn.commit()
conn.close()
希望这能提供一个想法。
谢谢