使用 sqlalchemy executemany 插入 pandas DataFrame 时管理 nan

Managing nan when inserting a pandas DataFrame with sqlalchemy executemany

我正在尝试使用 sqlalchemy 游标方法 executemany 将 pandas 数据框插入到 mysql 数据库中。这是批量插入数据的一种快速有效的方法,但是没有 MySQLdb._exceptions.ProgrammingErrorMySQLdb._exceptions.OperationalError 就无法插入 pandas.NA/numpy.nan/None 值。

import pandas as pd
from sqlalchemy import create_engine

def insert(dff, table_name):
    engine = create_engine('mysql://user:password@host:port/database?charset=utf8'), echo=False)

    # The query without the values to insert
    query_template = """
    INSERT INTO %s (%s)
    VALUES (%s)
    ON DUPLICATE KEY UPDATE %s;
    """ % (
        table_name,
        ",".join(["`%s`" % colname for colname in dff.columns]),
        ','.join(['%s'] * len(dff.columns)),
        ",".join(["`%s`=VALUES(`%s`)" % (colname, colname) for colname in dff.columns])
    )

    # Connection and bulk insert
    with engine.begin() as connection:
        raw_connection = connection.engine.raw_connection()
        mycursor = raw_connection.cursor()
        mycursor.executemany(query_template, dff.values.tolist())  # /!\ Here is the problem /!\
        raw_connection.commit()
    engine.dispose()

dff = pd.DataFrame({"col1": ["a", "b", "c", "d"], "col2": [1, pd.NA, 2, 3], "col3": [0.0, 1, pd.NA, 3.43]})
insert(dff, "my_table")

我可能可以通过遍历 DataFrame 内容来预先计算每个查询或一个大查询,但这是一种非常低效的工作方式。使用 pandas DataFrame to_sql 方法在微调 ON DUPLICATE KEY 时会缺乏灵活性,因此它不是一个选项。

使用自定义查询和 nan 值执行 pandas DataFrame 批量插入的最有效方法是什么?

真正的问题是 dff.values 创建了一个无法计算 None 值的类型化矩阵 intfloat。但实际上 executemany 可以插入 None 值。

我找到的最快的解决方案是更正提供给 executemany 的列表列表,而不是在创建列表列表之前更正数据框内容。

我插入的数据不再是 dff.values.tolist(),而是:

inserted_data = [
    [None if pd.isnull(value) else value for value in sublist] \
    for sublist in dff.values.tolist()]

mycursor.executemany(query_template, inserted_data )