使用 sqlalchemy executemany 插入 pandas DataFrame 时管理 nan
Managing nan when inserting a pandas DataFrame with sqlalchemy executemany
我正在尝试使用 sqlalchemy 游标方法 executemany
将 pandas 数据框插入到 mysql 数据库中。这是批量插入数据的一种快速有效的方法,但是没有 MySQLdb._exceptions.ProgrammingError
或 MySQLdb._exceptions.OperationalError
就无法插入 pandas.NA
/numpy.nan
/None
值。
import pandas as pd
from sqlalchemy import create_engine
def insert(dff, table_name):
engine = create_engine('mysql://user:password@host:port/database?charset=utf8'), echo=False)
# The query without the values to insert
query_template = """
INSERT INTO %s (%s)
VALUES (%s)
ON DUPLICATE KEY UPDATE %s;
""" % (
table_name,
",".join(["`%s`" % colname for colname in dff.columns]),
','.join(['%s'] * len(dff.columns)),
",".join(["`%s`=VALUES(`%s`)" % (colname, colname) for colname in dff.columns])
)
# Connection and bulk insert
with engine.begin() as connection:
raw_connection = connection.engine.raw_connection()
mycursor = raw_connection.cursor()
mycursor.executemany(query_template, dff.values.tolist()) # /!\ Here is the problem /!\
raw_connection.commit()
engine.dispose()
dff = pd.DataFrame({"col1": ["a", "b", "c", "d"], "col2": [1, pd.NA, 2, 3], "col3": [0.0, 1, pd.NA, 3.43]})
insert(dff, "my_table")
我可能可以通过遍历 DataFrame 内容来预先计算每个查询或一个大查询,但这是一种非常低效的工作方式。使用 pandas DataFrame to_sql
方法在微调 ON DUPLICATE KEY
时会缺乏灵活性,因此它不是一个选项。
使用自定义查询和 nan 值执行 pandas DataFrame 批量插入的最有效方法是什么?
真正的问题是 dff.values
创建了一个无法计算 None
值的类型化矩阵 int 或 float。但实际上 executemany 可以插入 None
值。
我找到的最快的解决方案是更正提供给 executemany
的列表列表,而不是在创建列表列表之前更正数据框内容。
我插入的数据不再是 dff.values.tolist()
,而是:
inserted_data = [
[None if pd.isnull(value) else value for value in sublist] \
for sublist in dff.values.tolist()]
mycursor.executemany(query_template, inserted_data )
我正在尝试使用 sqlalchemy 游标方法 executemany
将 pandas 数据框插入到 mysql 数据库中。这是批量插入数据的一种快速有效的方法,但是没有 MySQLdb._exceptions.ProgrammingError
或 MySQLdb._exceptions.OperationalError
就无法插入 pandas.NA
/numpy.nan
/None
值。
import pandas as pd
from sqlalchemy import create_engine
def insert(dff, table_name):
engine = create_engine('mysql://user:password@host:port/database?charset=utf8'), echo=False)
# The query without the values to insert
query_template = """
INSERT INTO %s (%s)
VALUES (%s)
ON DUPLICATE KEY UPDATE %s;
""" % (
table_name,
",".join(["`%s`" % colname for colname in dff.columns]),
','.join(['%s'] * len(dff.columns)),
",".join(["`%s`=VALUES(`%s`)" % (colname, colname) for colname in dff.columns])
)
# Connection and bulk insert
with engine.begin() as connection:
raw_connection = connection.engine.raw_connection()
mycursor = raw_connection.cursor()
mycursor.executemany(query_template, dff.values.tolist()) # /!\ Here is the problem /!\
raw_connection.commit()
engine.dispose()
dff = pd.DataFrame({"col1": ["a", "b", "c", "d"], "col2": [1, pd.NA, 2, 3], "col3": [0.0, 1, pd.NA, 3.43]})
insert(dff, "my_table")
我可能可以通过遍历 DataFrame 内容来预先计算每个查询或一个大查询,但这是一种非常低效的工作方式。使用 pandas DataFrame to_sql
方法在微调 ON DUPLICATE KEY
时会缺乏灵活性,因此它不是一个选项。
使用自定义查询和 nan 值执行 pandas DataFrame 批量插入的最有效方法是什么?
真正的问题是 dff.values
创建了一个无法计算 None
值的类型化矩阵 int 或 float。但实际上 executemany 可以插入 None
值。
我找到的最快的解决方案是更正提供给 executemany
的列表列表,而不是在创建列表列表之前更正数据框内容。
我插入的数据不再是 dff.values.tolist()
,而是:
inserted_data = [
[None if pd.isnull(value) else value for value in sublist] \
for sublist in dff.values.tolist()]
mycursor.executemany(query_template, inserted_data )