如何使用python在数据库之间进行ETL?
How to use python to ETL between databases?
使用 psycopg2,我能够在一个 PostgreSQL 数据库连接中 select 来自 table 的数据,并将其插入第二个 Postgre 中的 table SQL 数据库连接。
但是,我只能通过设置我想要提取的确切特征,并为我要插入的每一列写出单独的变量来做到这一点。
有谁知道以下两者的良好做法:
- 在数据库之间移动整个 table,或
- 迭代功能,而不必为要移动的每一列声明变量
- 或者...?
这是我目前正在使用的脚本,您可以在其中看到 select 特定特征的离子,以及变量的创建(它有效,但这不是一个实用的方法):
import psycopg2
connDev = psycopg2.connect("host=host1 dbname=dbname1 user=postgres password=*** ")
connQa = psycopg2.connect("host=host2 dbname=dbname2 user=postgres password=*** ")
curDev = connDev.cursor()
curQa = connQa.cursor()
sql = ('INSERT INTO "tempHoods" (nbhd_name, geom) values (%s, %s);')
curDev.execute('select cast(geom as varchar) from "CCD_Neighborhoods" where nbhd_id = 11;')
tempGeom = curDev.fetchone()
curDev.execute('select nbhd_name from "CCD_Neighborhoods" where nbhd_id = 11;')
tempName = curDev.fetchone()
data = (tempName, tempGeom)
curQa.execute (sql, data)
#commit transactions
connDev.commit()
connQa.commit()
#close connections
curDev.close()
curQa.close()
connDev.close()
connQa.close()
另一个注意事项是 python 允许显式使用 SQL 函数/数据类型转换,这对我们来说很重要,因为我们使用 GEOMETRY 数据类型。在上方您可以看到我将其转换为 TEXT,然后将其转储到源 table 中的现有几何列中 - 这将适用于 MSSQL 服务器,这是地理空间社区中的一个重要功能。 ..
这是我根据 Dmitry 的出色解决方案更新的代码:
import psycopg2
connDev = psycopg2.connect("host=host1 dbname=dpspgisdev user=postgres password=****")
connQa = psycopg2.connect("host=host2 dbname=dpspgisqa user=postgres password=****")
curDev = connDev.cursor()
curQa = connQa.cursor()
print "Truncating Source"
curQa.execute('delete from "tempHoods"')
connQa.commit()
#Get Data
curDev.execute('select nbhd_id, nbhd_name, typology, notes, cast(geom as varchar) from "CCD_Neighborhoods";') #cast geom to varchar and insert into geometry column!
rows = curDev.fetchall()
sql_insert = 'INSERT INTO "tempHoods" (nbhd_id, nbhd_name, typology, notes, geom) values '
sql_values = ['(%s, %s, %s, %s, %s)'] #number of columns selecting / inserting
data_values = []
batch_size = 1000 #customize for size of tables...
sql_stmt = sql_insert + ','.join(sql_values*batch_size) + ';'
for i, row in enumerate(rows, 1):
data_values += row[:5] #relates to number of columns (%s)
if i % batch_size == 0:
curQa.execute (sql_stmt , data_values )
connQa.commit()
print "Inserting..."
data_values = []
if (i % batch_size != 0):
sql_stmt = sql_insert + ','.join(sql_values*(i % batch_size)) + ';'
curQa.execute (sql_stmt, data_values)
print "Last Values..."
connQa.commit()
# close connections
curDev.close()
curQa.close()
connDev.close()
connQa.close()
在你的解决方案中(你的解决方案和你的问题有不同的语句顺序)将以“sql =
”开头的行和“#commit transactions
”评论之前的循环更改为
sql_insert = 'INSERT INTO "tempHoods" (nbhd_id, nbhd_name, typology, notes, geom) values '
sql_values = ['(%s, %s, %s, %s, %s)']
data_values = []
# you can make this larger if you want
# ...try experimenting to see what works best
batch_size = 100
sql_stmt = sql_insert + ','.join(sql_values*batch_size) + ';'
for i, row in enumerate(rows, 1):
data_values += row[:5]
if i % batch_size == 0:
curQa.execute (sql_stmt , data_values )
data_values = []
if (i % batch_size != 0):
sql_stmt = sql_insert + ','.join(sql_values*(i % batch_size)) + ';'
curQa.execute (sql_stmt , data_values )
顺便说一句,我认为您不需要承诺。您不开始任何交易。因此,没有必要提交它们。当然,如果您所做的只是一堆选择,则不需要提交游标。
使用 psycopg2,我能够在一个 PostgreSQL 数据库连接中 select 来自 table 的数据,并将其插入第二个 Postgre 中的 table SQL 数据库连接。
但是,我只能通过设置我想要提取的确切特征,并为我要插入的每一列写出单独的变量来做到这一点。
有谁知道以下两者的良好做法:
- 在数据库之间移动整个 table,或
- 迭代功能,而不必为要移动的每一列声明变量
- 或者...?
这是我目前正在使用的脚本,您可以在其中看到 select 特定特征的离子,以及变量的创建(它有效,但这不是一个实用的方法):
import psycopg2
connDev = psycopg2.connect("host=host1 dbname=dbname1 user=postgres password=*** ")
connQa = psycopg2.connect("host=host2 dbname=dbname2 user=postgres password=*** ")
curDev = connDev.cursor()
curQa = connQa.cursor()
sql = ('INSERT INTO "tempHoods" (nbhd_name, geom) values (%s, %s);')
curDev.execute('select cast(geom as varchar) from "CCD_Neighborhoods" where nbhd_id = 11;')
tempGeom = curDev.fetchone()
curDev.execute('select nbhd_name from "CCD_Neighborhoods" where nbhd_id = 11;')
tempName = curDev.fetchone()
data = (tempName, tempGeom)
curQa.execute (sql, data)
#commit transactions
connDev.commit()
connQa.commit()
#close connections
curDev.close()
curQa.close()
connDev.close()
connQa.close()
另一个注意事项是 python 允许显式使用 SQL 函数/数据类型转换,这对我们来说很重要,因为我们使用 GEOMETRY 数据类型。在上方您可以看到我将其转换为 TEXT,然后将其转储到源 table 中的现有几何列中 - 这将适用于 MSSQL 服务器,这是地理空间社区中的一个重要功能。 ..
这是我根据 Dmitry 的出色解决方案更新的代码:
import psycopg2
connDev = psycopg2.connect("host=host1 dbname=dpspgisdev user=postgres password=****")
connQa = psycopg2.connect("host=host2 dbname=dpspgisqa user=postgres password=****")
curDev = connDev.cursor()
curQa = connQa.cursor()
print "Truncating Source"
curQa.execute('delete from "tempHoods"')
connQa.commit()
#Get Data
curDev.execute('select nbhd_id, nbhd_name, typology, notes, cast(geom as varchar) from "CCD_Neighborhoods";') #cast geom to varchar and insert into geometry column!
rows = curDev.fetchall()
sql_insert = 'INSERT INTO "tempHoods" (nbhd_id, nbhd_name, typology, notes, geom) values '
sql_values = ['(%s, %s, %s, %s, %s)'] #number of columns selecting / inserting
data_values = []
batch_size = 1000 #customize for size of tables...
sql_stmt = sql_insert + ','.join(sql_values*batch_size) + ';'
for i, row in enumerate(rows, 1):
data_values += row[:5] #relates to number of columns (%s)
if i % batch_size == 0:
curQa.execute (sql_stmt , data_values )
connQa.commit()
print "Inserting..."
data_values = []
if (i % batch_size != 0):
sql_stmt = sql_insert + ','.join(sql_values*(i % batch_size)) + ';'
curQa.execute (sql_stmt, data_values)
print "Last Values..."
connQa.commit()
# close connections
curDev.close()
curQa.close()
connDev.close()
connQa.close()
在你的解决方案中(你的解决方案和你的问题有不同的语句顺序)将以“sql =
”开头的行和“#commit transactions
”评论之前的循环更改为
sql_insert = 'INSERT INTO "tempHoods" (nbhd_id, nbhd_name, typology, notes, geom) values '
sql_values = ['(%s, %s, %s, %s, %s)']
data_values = []
# you can make this larger if you want
# ...try experimenting to see what works best
batch_size = 100
sql_stmt = sql_insert + ','.join(sql_values*batch_size) + ';'
for i, row in enumerate(rows, 1):
data_values += row[:5]
if i % batch_size == 0:
curQa.execute (sql_stmt , data_values )
data_values = []
if (i % batch_size != 0):
sql_stmt = sql_insert + ','.join(sql_values*(i % batch_size)) + ';'
curQa.execute (sql_stmt , data_values )
顺便说一句,我认为您不需要承诺。您不开始任何交易。因此,没有必要提交它们。当然,如果您所做的只是一堆选择,则不需要提交游标。