基本的 pyodbc 批量插入
basic pyodbc bulk insert
在 python 脚本中,我需要 运行 一个数据源的查询,并将该查询的每一行插入到另一个数据源的 table 中。我通常会使用带有 tsql 链接服务器连接的单个 insert/select 语句来执行此操作,但我没有到此特定数据源的链接服务器连接。
我找不到一个简单的 pyodbc 示例。这是我的做法,但我猜在循环内执行插入语句非常慢。
result = ds1Cursor.execute(selectSql)
for row in result:
insertSql = "insert into TableName (Col1, Col2, Col3) values (?, ?, ?)"
ds2Cursor.execute(insertSql, row[0], row[1], row[2])
ds2Cursor.commit()
有没有更好的批量插入pyodbc记录的方法?或者这是一种相对有效的方法吗?我使用的是 SqlServer 2012,以及最新的 pyodbc 和 python 版本。
处理此问题的最佳方法是使用 pyodbc 函数 executemany
。
ds1Cursor.execute(selectSql)
result = ds1Cursor.fetchall()
ds2Cursor.executemany('INSERT INTO [TableName] (Col1, Col2, Col3) VALUES (?, ?, ?)', result)
ds2Cursor.commit()
这是一个可以批量插入 SQL 服务器数据库的函数。
import pyodbc
import contextlib
def bulk_insert(table_name, file_path):
string = "BULK INSERT {} FROM '{}' (WITH FORMAT = 'CSV');"
with contextlib.closing(pyodbc.connect("MYCONN")) as conn:
with contextlib.closing(conn.cursor()) as cursor:
cursor.execute(string.format(table_name, file_path))
conn.commit()
这绝对有效。
更新:我在评论以及定期编码时注意到,pyodbc 比 pypyodbc 得到更好的支持。
新更新:删除 conn.close() 因为 with 语句会自动处理它。
由于 pymssql 库(which seems to be under development again) we started using the cTDS library 由 Zillow 的聪明人开发,令我们惊讶的是它支持 FreeTDS 批量插入。
顾名思义,cTDS 是在 FreeTDS 库之上用 C 编写的,这使得它非常快,非常快。恕我直言,这是批量插入 SQL 服务器的最佳方式,因为 ODBC 驱动程序不支持批量插入,并且建议的 executemany
或 fast_executemany
并不是真正的批量插入操作。 BCP 工具和 T-SQL 批量插入有其局限性,因为它需要文件可以被 SQL 服务器访问,这在许多情况下可能是一个交易破坏者。
下面是批量插入 CSV 文件的简单实现。请原谅我的任何错误,我没有测试就写了这个。
我不知道为什么,但对于使用 Latin1_General_CI_AS 的服务器,我需要用 ctds.SqlVarChar 包装进入 NVarChar 列的数据。 I opened an issue about this but developers said the naming is correct,所以我更改了代码以保持心理健康。
import csv
import ctds
def _to_varchar(txt: str) -> ctds.VARCHAR:
"""
Wraps strings into ctds.NVARCHAR.
"""
if txt == "null":
return None
return ctds.SqlNVarChar(txt)
def _to_nvarchar(txt: str) -> ctds.VARCHAR:
"""
Wraps strings into ctds.VARCHAR.
"""
if txt == "null":
return None
return ctds.SqlVarChar(txt.encode("utf-16le"))
def read(file):
"""
Open CSV File.
Each line is a column:value dict.
https://docs.python.org/3/library/csv.html?highlight=csv#csv.DictReader
"""
with open(file, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
yield row
def transform(row):
"""
Do transformations to data before loading.
Data specified for bulk insertion into text columns (e.g. VARCHAR,
NVARCHAR, TEXT) is not encoded on the client in any way by FreeTDS.
Because of this behavior it is possible to insert textual data with
an invalid encoding and cause the column data to become corrupted.
To prevent this, it is recommended the caller explicitly wrap the
the object with either ctds.SqlVarChar (for CHAR, VARCHAR or TEXT
columns) or ctds.SqlNVarChar (for NCHAR, NVARCHAR or NTEXT columns).
For non-Unicode columns, the value should be first encoded to
column’s encoding (e.g. latin-1). By default ctds.SqlVarChar will
encode str objects to utf-8, which is likely incorrect for most SQL
Server configurations.
https://zillow.github.io/ctds/bulk_insert.html#text-columns
"""
row["col1"] = _to_datetime(row["col1"])
row["col2"] = _to_int(row["col2"])
row["col3"] = _to_nvarchar(row["col3"])
row["col4"] = _to_varchar(row["col4"])
return row
def load(rows):
stime = time.time()
with ctds.connect(**DBCONFIG) as conn:
with conn.cursor() as curs:
curs.execute("TRUNCATE TABLE MYSCHEMA.MYTABLE")
loaded_lines = conn.bulk_insert("MYSCHEMA.MYTABLE", map(transform, rows))
etime = time.time()
print(loaded_lines, " rows loaded in ", etime - stime)
if __name__ == "__main__":
load(read('data.csv'))
您应该将 executemany
与 cursor.fast_executemany = True
一起使用,以提高性能。
pyodbc
的默认行为是 运行 多次插入,但这是低效的。通过应用 fast_executemany
,您可以显着提高性能。
这是一个例子:
connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server}',host='host', database='db', user='usr', password='foo')
cursor = connection.cursor()
# I'm the important line
cursor.fast_executemany = True
sql = "insert into TableName (Col1, Col2, Col3) values (?, ?, ?)"
tuples=[('foo','bar', 'ham'), ('hoo','far', 'bam')]
cursor.executemany(sql, tuples)
cursor.commit()
cursor.close()
connection.close()
Docs。
请注意,此功能自 4.0.19 Oct 23, 2017
以来可用
生成 SQL 所需的有用函数 execute_many():
def generate_bulk_insert_sql(self, data:pd.DataFrame, table_name) -> str:
table_sql = str([c for c in data.columns]).replace("'","").replace("[", "").replace("]", "")
return f'INSERT INTO {table_name} ({table_sql}) VALUES ({("?,"*len(data.columns))[:-1]})
在 python 脚本中,我需要 运行 一个数据源的查询,并将该查询的每一行插入到另一个数据源的 table 中。我通常会使用带有 tsql 链接服务器连接的单个 insert/select 语句来执行此操作,但我没有到此特定数据源的链接服务器连接。
我找不到一个简单的 pyodbc 示例。这是我的做法,但我猜在循环内执行插入语句非常慢。
result = ds1Cursor.execute(selectSql)
for row in result:
insertSql = "insert into TableName (Col1, Col2, Col3) values (?, ?, ?)"
ds2Cursor.execute(insertSql, row[0], row[1], row[2])
ds2Cursor.commit()
有没有更好的批量插入pyodbc记录的方法?或者这是一种相对有效的方法吗?我使用的是 SqlServer 2012,以及最新的 pyodbc 和 python 版本。
处理此问题的最佳方法是使用 pyodbc 函数 executemany
。
ds1Cursor.execute(selectSql)
result = ds1Cursor.fetchall()
ds2Cursor.executemany('INSERT INTO [TableName] (Col1, Col2, Col3) VALUES (?, ?, ?)', result)
ds2Cursor.commit()
这是一个可以批量插入 SQL 服务器数据库的函数。
import pyodbc
import contextlib
def bulk_insert(table_name, file_path):
string = "BULK INSERT {} FROM '{}' (WITH FORMAT = 'CSV');"
with contextlib.closing(pyodbc.connect("MYCONN")) as conn:
with contextlib.closing(conn.cursor()) as cursor:
cursor.execute(string.format(table_name, file_path))
conn.commit()
这绝对有效。
更新:我在评论以及定期编码时注意到,pyodbc 比 pypyodbc 得到更好的支持。
新更新:删除 conn.close() 因为 with 语句会自动处理它。
由于 pymssql 库(which seems to be under development again) we started using the cTDS library 由 Zillow 的聪明人开发,令我们惊讶的是它支持 FreeTDS 批量插入。
顾名思义,cTDS 是在 FreeTDS 库之上用 C 编写的,这使得它非常快,非常快。恕我直言,这是批量插入 SQL 服务器的最佳方式,因为 ODBC 驱动程序不支持批量插入,并且建议的 executemany
或 fast_executemany
并不是真正的批量插入操作。 BCP 工具和 T-SQL 批量插入有其局限性,因为它需要文件可以被 SQL 服务器访问,这在许多情况下可能是一个交易破坏者。
下面是批量插入 CSV 文件的简单实现。请原谅我的任何错误,我没有测试就写了这个。
我不知道为什么,但对于使用 Latin1_General_CI_AS 的服务器,我需要用 ctds.SqlVarChar 包装进入 NVarChar 列的数据。 I opened an issue about this but developers said the naming is correct,所以我更改了代码以保持心理健康。
import csv
import ctds
def _to_varchar(txt: str) -> ctds.VARCHAR:
"""
Wraps strings into ctds.NVARCHAR.
"""
if txt == "null":
return None
return ctds.SqlNVarChar(txt)
def _to_nvarchar(txt: str) -> ctds.VARCHAR:
"""
Wraps strings into ctds.VARCHAR.
"""
if txt == "null":
return None
return ctds.SqlVarChar(txt.encode("utf-16le"))
def read(file):
"""
Open CSV File.
Each line is a column:value dict.
https://docs.python.org/3/library/csv.html?highlight=csv#csv.DictReader
"""
with open(file, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
yield row
def transform(row):
"""
Do transformations to data before loading.
Data specified for bulk insertion into text columns (e.g. VARCHAR,
NVARCHAR, TEXT) is not encoded on the client in any way by FreeTDS.
Because of this behavior it is possible to insert textual data with
an invalid encoding and cause the column data to become corrupted.
To prevent this, it is recommended the caller explicitly wrap the
the object with either ctds.SqlVarChar (for CHAR, VARCHAR or TEXT
columns) or ctds.SqlNVarChar (for NCHAR, NVARCHAR or NTEXT columns).
For non-Unicode columns, the value should be first encoded to
column’s encoding (e.g. latin-1). By default ctds.SqlVarChar will
encode str objects to utf-8, which is likely incorrect for most SQL
Server configurations.
https://zillow.github.io/ctds/bulk_insert.html#text-columns
"""
row["col1"] = _to_datetime(row["col1"])
row["col2"] = _to_int(row["col2"])
row["col3"] = _to_nvarchar(row["col3"])
row["col4"] = _to_varchar(row["col4"])
return row
def load(rows):
stime = time.time()
with ctds.connect(**DBCONFIG) as conn:
with conn.cursor() as curs:
curs.execute("TRUNCATE TABLE MYSCHEMA.MYTABLE")
loaded_lines = conn.bulk_insert("MYSCHEMA.MYTABLE", map(transform, rows))
etime = time.time()
print(loaded_lines, " rows loaded in ", etime - stime)
if __name__ == "__main__":
load(read('data.csv'))
您应该将 executemany
与 cursor.fast_executemany = True
一起使用,以提高性能。
pyodbc
的默认行为是 运行 多次插入,但这是低效的。通过应用 fast_executemany
,您可以显着提高性能。
这是一个例子:
connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server}',host='host', database='db', user='usr', password='foo')
cursor = connection.cursor()
# I'm the important line
cursor.fast_executemany = True
sql = "insert into TableName (Col1, Col2, Col3) values (?, ?, ?)"
tuples=[('foo','bar', 'ham'), ('hoo','far', 'bam')]
cursor.executemany(sql, tuples)
cursor.commit()
cursor.close()
connection.close()
Docs。 请注意,此功能自 4.0.19 Oct 23, 2017
以来可用生成 SQL 所需的有用函数 execute_many():
def generate_bulk_insert_sql(self, data:pd.DataFrame, table_name) -> str:
table_sql = str([c for c in data.columns]).replace("'","").replace("[", "").replace("]", "")
return f'INSERT INTO {table_name} ({table_sql}) VALUES ({("?,"*len(data.columns))[:-1]})