如何使用python在数据库之间进行ETL?

How to use python to ETL between databases?

使用 psycopg2,我能够在一个 PostgreSQL 数据库连接中 select 来自 table 的数据,并将其插入第二个 Postgre 中的 table SQL 数据库连接。

但是,我只能通过设置我想要提取的确切特征,并为我要插入的每一列写出单独的变量来做到这一点。

有谁知道以下两者的良好做法:

这是我目前正在使用的脚本,您可以在其中看到 select 特定特征的离子,以及变量的创建(它有效,但这不是一个实用的方法):

import psycopg2

connDev = psycopg2.connect("host=host1 dbname=dbname1 user=postgres password=*** ")
connQa = psycopg2.connect("host=host2 dbname=dbname2 user=postgres password=*** ")
curDev = connDev.cursor()
curQa = connQa.cursor()

sql = ('INSERT INTO "tempHoods" (nbhd_name, geom) values (%s, %s);')

curDev.execute('select cast(geom as varchar) from "CCD_Neighborhoods" where nbhd_id = 11;')
tempGeom = curDev.fetchone()

curDev.execute('select nbhd_name from "CCD_Neighborhoods" where nbhd_id = 11;')
tempName = curDev.fetchone()

data = (tempName, tempGeom)

curQa.execute (sql, data)


#commit transactions
connDev.commit()
connQa.commit()

#close connections
curDev.close()
curQa.close()
connDev.close()
connQa.close()

另一个注意事项是 python 允许显式使用 SQL 函数/数据类型转换,这对我们来说很重要,因为我们使用 GEOMETRY 数据类型。在上方您可以看到我将其转换为 TEXT,然后将其转储到源 table 中的现有几何列中 - 这将适用于 MSSQL 服务器,这是地理空间社区中的一个重要功能。 ..

这是我根据 Dmitry 的出色解决方案更新的代码:

import psycopg2

connDev = psycopg2.connect("host=host1 dbname=dpspgisdev user=postgres password=****")
connQa = psycopg2.connect("host=host2 dbname=dpspgisqa user=postgres password=****")
curDev = connDev.cursor()
curQa = connQa.cursor()

print "Truncating Source"
curQa.execute('delete from "tempHoods"')
connQa.commit()
#Get Data
curDev.execute('select  nbhd_id, nbhd_name, typology, notes, cast(geom as varchar) from "CCD_Neighborhoods";') #cast geom to varchar and insert into geometry column!
rows = curDev.fetchall()


sql_insert = 'INSERT INTO "tempHoods" (nbhd_id, nbhd_name, typology, notes, geom) values '
sql_values = ['(%s, %s, %s, %s, %s)'] #number of columns selecting / inserting


data_values = []

batch_size = 1000 #customize for size of tables... 

sql_stmt = sql_insert + ','.join(sql_values*batch_size) + ';'

for i, row in enumerate(rows, 1):

            data_values += row[:5] #relates to number of columns (%s)
            if i % batch_size == 0:
                curQa.execute (sql_stmt , data_values )
                connQa.commit()
                print "Inserting..."
                data_values = []

if (i % batch_size != 0):
    sql_stmt = sql_insert + ','.join(sql_values*(i % batch_size)) + ';'
    curQa.execute (sql_stmt, data_values)
    print "Last Values..."
    connQa.commit()





# close connections
curDev.close()
curQa.close()
connDev.close()
connQa.close()

在你的解决方案中(你的解决方案和你的问题有不同的语句顺序)将以“sql =”开头的行和“#commit transactions”评论之前的循环更改为

sql_insert = 'INSERT INTO "tempHoods" (nbhd_id, nbhd_name, typology, notes, geom) values '
sql_values = ['(%s, %s, %s, %s, %s)']


data_values = []
# you can make this larger if you want
# ...try experimenting to see what works best
batch_size = 100 
sql_stmt = sql_insert + ','.join(sql_values*batch_size) + ';'
for i, row in enumerate(rows, 1):
    data_values += row[:5]
    if i % batch_size == 0:
        curQa.execute (sql_stmt , data_values )
        data_values = []
if (i % batch_size != 0):
    sql_stmt = sql_insert + ','.join(sql_values*(i % batch_size)) + ';'
    curQa.execute (sql_stmt , data_values )

顺便说一句,我认为您不需要承诺。您不开始任何交易。因此,没有必要提交它们。当然,如果您所做的只是一堆选择,则不需要提交游标。