如何使用 pyodbc 加速批量插入 MS SQL 服务器

How to speed up bulk insert to MS SQL Server using pyodbc

以下是我的代码,我需要一些帮助。 我不得不 运行 它超过 1,300,000 行,这意味着它最多需要 40 分钟 来插入 ~300,000 行。

我认为批量插入是加快速度的途径? 或者是因为我正在通过 for data in reader: 部分遍历行?

#Opens the prepped csv file
with open (os.path.join(newpath,outfile), 'r') as f:
    #hooks csv reader to file
    reader = csv.reader(f)
    #pulls out the columns (which match the SQL table)
    columns = next(reader)
    #trims any extra spaces
    columns = [x.strip(' ') for x in columns]
    #starts SQL statement
    query = 'bulk insert into SpikeData123({0}) values ({1})'
    #puts column names in SQL query 'query'
    query = query.format(','.join(columns), ','.join('?' * len(columns)))

    print 'Query is: %s' % query
    #starts curser from cnxn (which works)
    cursor = cnxn.cursor()
    #uploads everything by row
    for data in reader:
        cursor.execute(query, data)
        cursor.commit()

我有意动态选择我的专栏 headers(因为我想创建尽可能多的 Pythonic 代码)。

SpikeData123 是 table 名称。

是的,批量插入是将大文件加载到数据库中的正确路径。乍一看,我会说花费这么长时间的原因是正如您提到的那样,您正在遍历文件中的每一行数据,这实际上意味着正在消除使用批量插入并使其像普通插入一样的好处。请记住,顾名思义,它用于插入数据块。 我会删除循环并重试。

另外,我会仔细检查您的批量插入语法,因为我觉得它不正确。检查由 pyodbc 生成的 sql,因为我觉得它可能只执行正常的插入

或者,如果它仍然很慢,我会尝试直接从 sql 使用批量插入,然后将整个文件加载到带有批量插入的临时 table,然后将相关列插入右侧 table秒。或者混合使用批量插入和 bcp 来插入特定的列或 OPENROWSET。

更新 - 2022 年 5 月:bcpandas and bcpyaz 是 Microsoft bcp 实用程序的包装器。


更新 - 2019 年 4 月:正如@SimonLang 的评论所述,SQL 服务器 2017 及更高版本下的 BULK INSERT 显然支持 CSV 文件中的文本限定符(参考:here ).


BULK INSERT 几乎肯定比逐行读取源文件并为每一行执行常规 INSERT 快 很多 。但是,BULK INSERT 和 BCP 都对 CSV 文件有很大的限制,因为它们无法处理文本限定符(参考:here)。也就是说,如果您的 CSV 文件没有其中包含合格的文本字符串...

1,Gord Thompson,2015-04-15
2,Bob Loblaw,2015-04-07

...那么您可以批量插入它,但如果它包含文本限定符(因为某些文本值包含逗号)...

1,"Thompson, Gord",2015-04-15
2,"Loblaw, Bob",2015-04-07

... 那么 BULK INSERT 无法处理它。不过,将此类 CSV 文件预处理为竖线分隔文件总体上可能会更快...

1|Thompson, Gord|2015-04-15
2|Loblaw, Bob|2015-04-07

... 或制表符分隔的文件(其中 表示制表符)...

1→Thompson, Gord→2015-04-15
2→Loblaw, Bob→2015-04-07

... 然后批量插入该文件。对于后者(制表符分隔)文件,BULK INSERT 代码看起来像这样:

import pypyodbc
conn_str = "DSN=myDb_SQLEXPRESS;"
cnxn = pypyodbc.connect(conn_str)
crsr = cnxn.cursor()
sql = """
BULK INSERT myDb.dbo.SpikeData123
FROM 'C:\__tmp\biTest.txt' WITH (
    FIELDTERMINATOR='\t',
    ROWTERMINATOR='\n'
    );
"""
crsr.execute(sql)
cnxn.commit()
crsr.close()
cnxn.close()

注意:如评论中所述,执行BULK INSERT语句仅适用于SQL服务器实例可以直接读取源文件的情况。对于源文件位于远程客户端的情况,请参阅

如对另一个答案的评论所述,T-SQL BULK INSERT 命令仅在要导入的文件与 SQL 服务器位于同一台计算机上时才有效实例或位于 SQL 服务器实例可以读取的 SMB/CIFS 网络位置。因此它可能不适用于源文件在远程客户端上的情况。

pyodbc 4.0.19 添加了一个 Cursor#fast_executemany 功能,在这种情况下可能会有帮助。 fast_executemany默认为"off",下面测试代码...

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.time()
crsr.executemany(sql, params)
print(f'{time.time() - t0:.1f} seconds')

... 在我的测试机器上执行大约需要 22 秒。只需添加 crsr.fast_executemany = True ...

cnxn = pyodbc.connect(conn_str, autocommit=True)
crsr = cnxn.cursor()
crsr.execute("TRUNCATE TABLE fast_executemany_test")

crsr.fast_executemany = True  # new in pyodbc 4.0.19

sql = "INSERT INTO fast_executemany_test (txtcol) VALUES (?)"
params = [(f'txt{i:06d}',) for i in range(1000)]
t0 = time.time()
crsr.executemany(sql, params)
print(f'{time.time() - t0:.1f} seconds')

...将执行时间减少到 1 秒多一点。

这个问题让我很沮丧,直到我在 SO 上找到这个 post 之前,我使用 fast_executemany 并没有看到太大的改进。具体来说,Bryan Bailiache 关于 max varchar 的评论。我一直在使用 SQLAlchemy,甚至确保更好的数据类型参数也没有解决我的问题;然而,切换到 pyodbc 确实如此。我还采纳了 Michael Moura 关于使用临时 table 的建议,发现它节省了更多时间。我写了一个函数以防万一有人觉得它有用。我写它是为了插入一个列表或列表列表。我使用 SQLAlchemy 和 Pandas to_sql 插入相同数据的时间从有时超过 40 分钟缩短到不到 4 秒。不过我可能一直在滥用我以前的方法。

connection

def mssql_conn():
    conn = pyodbc.connect(driver='{ODBC Driver 17 for SQL Server}',
                          server=os.environ.get('MS_SQL_SERVER'),
                          database='EHT',
                          uid=os.environ.get('MS_SQL_UN'),
                          pwd=os.environ.get('MS_SQL_PW'),
                          autocommit=True)
    return conn

Insert function

def mssql_insert(table,val_lst,truncate=False,temp_table=False):
    '''Use as direct connection to database to insert data, especially for
       large inserts. Takes either a single list (for one row),
       or list of list (for multiple rows). Can either append to table
       (default) or if truncate=True, replace existing.'''
    conn = mssql_conn()
    cursor = conn.cursor()
    cursor.fast_executemany = True
    tt = False
    qm = '?,'
    if isinstance(val_lst[0],list):
        rows = len(val_lst)
        params = qm * len(val_lst[0])
    else:
        rows = 1
        params = qm * len(val_lst)
        val_lst = [val_lst]
    params = params[:-1]
    if truncate:
        cursor.execute(f"TRUNCATE TABLE {table}")
    if temp_table:
        #create a temp table with same schema
        start_time = time.time()
        cursor.execute(f"SELECT * INTO ##{table} FROM {table} WHERE 1=0")
        table = f"##{table}"
        #set flag to indicate temp table was used
        tt = True
    else:
        start_time = time.time()
    #insert into either existing table or newly created temp table
    stmt = f"INSERT INTO {table} VALUES ({params})"
    cursor.executemany(stmt,val_lst)
    if tt:
        #remove temp moniker and insert from temp table
        dest_table = table[2:]
        cursor.execute(f"INSERT INTO {dest_table} SELECT * FROM {table}")
        print('Temp table used!')
        print(f'{rows} rows inserted into the {dest_table} table in {time.time() - 
              start_time} seconds')
    else:
        print('No temp table used!')
        print(f'{rows} rows inserted into the {table} table in {time.time() - 
              start_time} seconds')
    cursor.close()
    conn.close()

我的控制台结果首先使用临时 table 然后不使用临时(在这两种情况下,table 包含执行时的数据和 Truncate=True):

No temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 10.595500707626343 
seconds

Temp table used!
18204 rows inserted into the CUCMDeviceScrape_WithForwards table in 3.810380458831787 
seconds

FWIW,我给出了一些插入 SQL 服务器的方法,我自己进行了一些测试。实际上,我能够通过使用 SQL Server Batches 和 pyodbcCursor.execute 语句获得最快的结果。 save to csv 和 BULK INSERT 我没有测试,不知道比较如何。

这是我关于我所做的测试的博客: http://jonmorisissqlblog.blogspot.com/2021/05/python-pyodbc-and-batch-inserts-to-sql.html