使用 pyODBC 的 fast_executemany 加速 pandas.DataFrame.to_sql

Speeding up pandas.DataFrame.to_sql with fast_executemany of pyODBC

我想发送一个大 pandas.DataFrame 到远程服务器 运行 MS SQL。我现在的做法是将 data_frame 对象转换为元组列表,然后使用 pyODBC 的 executemany() 函数将其发送出去。它是这样的:

 import pyodbc as pdb

 list_of_tuples = convert_df(data_frame)

 connection = pdb.connect(cnxn_str)

 cursor = connection.cursor()
 cursor.fast_executemany = True
 cursor.executemany(sql_statement, list_of_tuples)
 connection.commit()

 cursor.close()
 connection.close()

然后我开始怀疑是否可以通过使用 data_frame.to_sql() 方法来加快速度(或至少提高可读性)。我想出了以下解决方案:

 import sqlalchemy as sa

 engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
 data_frame.to_sql(table_name, engine, index=False)

现在代码可读性更强,但上传速度至少慢了 150 倍...

有没有办法在使用 SQLAlchemy 时翻转 fast_executemany

我正在使用 pandas-0.20.3、pyODBC-4.0.21 和 sqlalchemy-1.1.13。

在联系了SQLAlchemy的开发者之后,出现了解决这个问题的办法。非常感谢他们的出色工作!

必须使用游标执行事件并检查是否已引发 executemany 标志。如果确实如此,请打开 fast_executemany 选项。例如:

from sqlalchemy import event

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

可以找到有关执行事件的更多信息 here


更新: SQLAlchemy 1.3.0 中添加了对 pyodbcfast_executemany 的支持,因此不再需要此 hack。

编辑(2019-03-08): Gord Thompson 在下面评论了来自 sqlalchemy 更新日志的好消息:Since SQLAlchemy 1.3.0,发布于 2019-03-04,sqlalchemy 现在支持 engine = create_engine(sqlalchemy_url, fast_executemany=True) mssql+pyodbc 方言。也就是说,不再需要定义一个函数并使用 @event.listens_for(engine, 'before_cursor_execute') 这意味着可以删除下面的函数,只需要在 create_engine 语句中设置标志 - 并且仍然保留speed-up.

原文Post:

刚刚在 post 上创建了一个帐户。我想在上面的线程下面发表评论,因为它是对已经提供的答案的跟进。上面的解决方案适用于 Microsft SQL 存储上的版本 17 SQL 驱动程序,从基于 Ubuntu 的安装中写入。

我用来显着加快速度(说话 >100x speed-up)的完整代码如下。这是一个 turn-key 片段,前提是您使用相关详细信息更改了连接字符串。对于上面的 poster,非常感谢您提供的解决方案,因为我已经花了很长时间来寻找这个解决方案。

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus


conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)


@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    print("FUNC call")
    if executemany:
        cursor.fast_executemany = True


table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))


s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)

根据下面的评论,我想花一些时间来解释有关 pandas to_sql 实现和查询处理方式的一些限制。有两件事可能导致 MemoryError 被引发 afaik:

1) 假设您正在写入远程 SQL 存储。当您尝试使用 to_sql 方法编写大型 pandas DataFrame 时,它​​会将整个 DataFrame 转换为值列表。这种转换比原始 DataFrame 占用更多的 RAM(在它之上,因为旧的 DataFrame 仍然存在于 RAM 中)。此列表提供给您的 ODBC 连接器的最终 executemany 调用。我认为 ODBC 连接器在处理如此大的查询时遇到了一些麻烦。解决这个问题的一种方法是为 to_sql 方法提供一个 chunksize 参数(10**5 似乎是最佳的,在 2 CPU 7GB 上提供大约 600 mbit/s (!) 的写入速度来自 Azure 的 ram MSSQL 存储应用程序 - 顺便说一句,不推荐 Azure)。因此,第一个限制是查询大小,可以通过提供 chunksize 参数来规避。但是,这不会使您能够编写大小为 10**7 或更大的数据帧(至少在我正在使用的具有 ~55GB RAM 的 VM 上),问题 nr 2。

这可以通过用 np.split 分解 DataFrame 来规避(10**6 大小的 DataFrame 块)这些可以迭代地写掉。当我为 pandas 本身的核心中的 to_sql 方法准备好解决方案时,我将尝试提出拉取请求,这样您就不必每次都这样做 pre-breaking 了。无论如何,我最终编写了一个类似于(不是 turn-key)的函数,如下所示:

import pandas as pd
import numpy as np

def write_df_to_sql(df, **kwargs):
    chunks = np.split(df, df.shape()[0] / 10**6)
    for chunk in chunks:
        chunk.to_sql(**kwargs)
    return True

可以在此处查看上述代码段的更完整示例:https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

这是我写的 class,它包含补丁并减轻了与 SQL 建立连接所带来的一些必要开销。还是得写一些文档。我还计划将补丁贡献给 pandas 本身,但还没有找到一个很好的方法来做到这一点。

希望对您有所帮助。

我只是想 post 这个完整的例子作为额外的,high-performance 选项给那些可以使用新的 turbodbc 库的人:http://turbodbc.readthedocs.io/en/latest/

pandas.to_sql() 之间显然有很多选项在变化,通过 sqlalchemy 触发 fast_executemany,直接使用 pyodbc 和 tuples/lists/etc,甚至尝试批量上传平面文件。

希望随着当前 pandas 项目中功能的发展或在未来包括诸如 turbodbc 集成之类的东西,以下内容可能会让生活变得更加愉快。

import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)

test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]

                CREATE TABLE [db_name].[schema].[test]
                (
                    id int NULL,
                    transaction_dt datetime NULL,
                    units int NULL,
                    measures float NULL
                )

                INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
                VALUES (?,?,?,?) '''

cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]

turbodbc 在许多用例中应该非常快(尤其是 numpy 数组)。请观察将数据框列中的底层 numpy 数组作为参数直接传递给查询是多么简单。我还相信这有助于防止创建过度增加内存消耗的中间对象。希望这对您有所帮助!

似乎 Pandas 0.23.0 和 0.24.0 带有 PyODBC,这会阻止 fast executemany 提供帮助——每个块发出一个 INSERT ... VALUES ... 语句。多值插入块是对旧的慢速 executemany 默认值的改进,但至少在简单测试中,快速 executemany 方法仍然盛行,更不用说不需要手动 chunksize 计算,这是多值插入所必需的。强制旧的行为可以通过 monkeypatching 来完成,如果以后没有提供配置选项:

import pandas.io.sql

def insert_statement(self, data, conn):
    return self.table.insert(), data

pandas.io.sql.SQLTable.insert_statement = insert_statement

未来就在这里,至少在 master 分支中,可以使用 to_sql() 的关键字参数 method= 来控制插入方法。它默认为 None,这会强制执行 executemany 方法。传递 method='multi' 会导致使用多值插入。它甚至可以用于实现 DBMS 特定方法,例如 Postgresql COPY.

正如@Pylander 指出的那样

到目前为止,Turbodbc 是数据摄取的最佳选择!

我非常兴奋,所以我在我的 github 和媒体上写了一个 'blog': 请检查 https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e

一个工作示例并与 pandas.to_sql

进行比较

长话短说,

使用 turbodbc 我在 3 秒内得到了 10000 行(77 列)

和pandas.to_sql 我在 198 秒内得到了相同的 10000 行(77 列)...

下面是我正在做的所有细节

进口:

import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time

加载并处理一些数据 - 将我的 sample.pkl 替换为您的:

df = pd.read_pickle('sample.pkl')

df.columns = df.columns.str.strip()  # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('', np.nan)  # map nans, to drop NAs rows and columns later
df = df.dropna(how='all', axis=0)  # remove rows containing only NAs
df = df.dropna(how='all', axis=1)  # remove columns containing only NAs
df = df.replace(np.nan, 'NA')  # turbodbc hates null values...

使用 sqlAlchemy

创建 table

不幸的是,turbodbc 需要大量 sql 体力劳动来创建 tables 并在其上插入数据。

幸运的是,Python 是一种纯粹的快乐,我们可以将编写 sql 代码的过程自动化。

第一步是创建 table 来接收我们的数据。但是,如果您的 table 有多个列,则手动编写 sql 代码创建 table 可能会出现问题。在我的例子中,table 通常有 240 列!

这就是 sqlAlchemy 和 pandas 仍然可以帮助我们的地方:pandas 不适合写入大量行(本例中为 10000),但是仅6行,行首的table?这样,我们就可以自动化创建 tables.

的过程

创建sqlAlchemy 连接:

mydb = 'someDB'

def make_con(db):
    """Connect to a specified db."""
    database_connection = sqlalchemy.create_engine(
        'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
            myuser, mypassword,
            myhost, db
            )
        )
    return database_connection

pd_connection = make_con(mydb)

在 SQL 服务器上创建 table

使用 pandas + sqlAlchemy,但只是为前面提到的 turbodbc 准备空间。请注意此处的 df.head():我们使用 pandas + sqlAlchemy 仅插入 6 行数据。这将 运行 非常快,并且正在完成 table 创建的自动化。

table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)

既然table已经到位,那我们就认真起来吧。

Turbodbc 连接:

def turbo_conn(mydb):
    """Connect to a specified db - turbo."""
    database_connection = turbodbc.connect(
                                            driver='ODBC Driver 17 for SQL Server',
                                            server=myhost,
                                            database=mydb,
                                            uid=myuser,
                                            pwd=mypassword
                                        )
    return database_connection

正在为 turbodbc 准备 sql 命令和数据。让我们将此代码创建自动化并发挥创意:

def turbo_write(mydb, df, table):
    """Use turbodbc to insert data into sql."""
    start = time.time()
    # preparing columns
    colunas = '('
    colunas += ', '.join(df.columns)
    colunas += ')'

    # preparing value place holders
    val_place_holder = ['?' for col in df.columns]
    sql_val = '('
    sql_val += ', '.join(val_place_holder)
    sql_val += ')'

    # writing sql query for turbodbc
    sql = f"""
    INSERT INTO {mydb}.dbo.{table} {colunas}
    VALUES {sql_val}
    """

    # writing array of values for turbodbc
    valores_df = [df[col].values for col in df.columns]

    # cleans the previous head insert
    with connection.cursor() as cursor:
        cursor.execute(f"delete from {mydb}.dbo.{table}")
        connection.commit()

    # inserts data, for real
    with connection.cursor() as cursor:
        try:
            cursor.executemanycolumns(sql, valores_df)
            connection.commit()
        except Exception:
            connection.rollback()
            print('something went wrong')

    stop = time.time() - start
    return print(f'finished in {stop} seconds')

使用 turbodbc 写入数据 - 我在 3 秒内得到了 10000 行(77 列):

turbo_write(mydb, df.sample(10000), table)

Pandas 方法比较 - 我在 198 秒内得到了相同的 10000 行(77 列)…

table = 'pd_testing'

def pandas_comparisson(df, table):
    """Load data using pandas."""
    start = time.time()
    df.to_sql(table, con=pd_connection, index=False)
    stop = time.time() - start
    return print(f'finished in {stop} seconds')

pandas_comparisson(df.sample(10000), table)

环境和条件

Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0’
sqlAlchemy version ‘1.2.12’
pandas version ‘0.23.4’
Microsoft SQL Server 2014
user with bulk operations privileges

请检查 https://erickfis.github.io/loose-code/ 以获取此代码中的更新!

SQL 服务器 INSERT 性能:pyodbc 与 turbodbc

当使用 to_sql 将 pandas DataFrame 上传到 SQL 服务器时,turbodbc 肯定会比没有 fast_executemany 的 pyodbc 更快。但是,在为 pyodbc 启用 fast_executemany 的情况下,这两种方法产生的性能基本相同。

测试环境:

[venv1_pyodbc]
pyodbc 2.0.25

[venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0

[两者共有]
Python 3.6.4 64 位 Windows
SQL炼金术 1.3.0b1
pandas0.23.4
麻木 1.15.4

测试代码:

# for pyodbc
engine = create_engine('mssql+pyodbc://sa:whatever@SQL_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:whatever@SQL_panorama')

# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
    [[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
    columns=[f'col{y:03}' for y in range(num_cols)]
)

t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")

每个环境的测试 运行 十二 (12) 次,丢弃每个环境的最佳和最差时间。结果(以秒为单位):

   rank  pyodbc  turbodbc
   ----  ------  --------
      1    22.8      27.5
      2    23.4      28.1
      3    24.6      28.2
      4    25.2      28.5
      5    25.7      29.3
      6    26.9      29.9
      7    27.0      31.4
      8    30.1      32.1
      9    33.6      32.5
     10    39.8      32.9
   ----  ------  --------
average    27.9      30.0

我 运行 遇到了同样的问题,但使用的是 PostgreSQL。他们现在刚刚发布 pandas 版本 0.24.0 并且 to_sql 函数中有一个名为 method 的新参数解决了我的问题。

from sqlalchemy import create_engine

engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")

上传速度对我来说快了 100 倍。 如果您要发送大量数据,我还建议设置 chunksize 参数。

只是想添加到@J.K. 的回答中。

如果您使用这种方法:

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

您收到此错误:

"sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('HY010', '[HY010] [Microsoft][SQL Server Native Client 11.0]Function sequence error (0) (SQLParamData)') [SQL: 'INSERT INTO ... (...) VALUES (?, ?)'] [parameters: ((..., ...), (..., ...)] (Background on this error at: http://sqlalche.me/e/dbapi)"

像这样编码您的字符串值:'yourStringValue'.encode('ascii')

这将解决您的问题。

我只是修改了引擎行,这帮助我将插入速度提高了 100 倍。

旧代码-

import json
import maya
import time
import pandas
import pyodbc
import pandas as pd
from sqlalchemy import create_engine

retry_count = 0
retry_flag = True

hostInfoDf = pandas.read_excel('test.xlsx', sheet_name='test')
print("Read Ok")

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

while retry_flag and retry_count < 5:
  try:
    df.to_sql("table_name",con=engine,if_exists="replace",index=False,chunksize=5000,schema="dbo")
    retry_flag = False
  except:
    retry_count = retry_count + 1
    time.sleep(30)

修改引擎系列 -

来自-

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

至-

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True)

向我询问任何与 python 到 SQL 连接相关的问题,我很乐意为您提供帮助。