如何使用 SQLAlchemy Core 在 SQLite3 数据库上正确执行 UPSERT?

How do I correctly perform an UPSERT on a SQLite3 DB using SQLAlchemy Core?

我有一个记录列表,其中很多是重复的,我正在尝试使用 SQLAlchemy Core 1.4.35 和 Python 3.9.2.

如果模块尝试插入 table 中尚不存在的记录,我希望插入成功,并且我希望 return 为创建的主键那条记录。

但是如果模块试图插入一条已经存在的记录,那么我希望插入失败。我想要 SELECT 和 return 现有记录的 ID。

当我 运行 下面的代码时,我 运行 遇到了两个问题,我无法弄清楚是什么导致了这两个问题。

  1. 如果我删除了 except IntegrityError as err 行下缩进的所有代码,那么模块会成功完成。但是数据库 table 有一堆重复的记录——尽管 table.

    有 UNIQUE 约束
  2. 如果我保留该代码,并在处理异常时尝试 SELECT 现有记录的 ID - 我会收到以下错误。

这两个问题是什么引起的?

如何防止重复记录被插入,以及SELECT现有记录的ID?

我还在学习 SQL 所以我可能遗漏了一些明显的东西。


可能相关更新

我检查的所有重复记录都有一个共同点,就是其中一个字段包含一个NULL。 SQLite 在检查 UNIQUE table 约束时是否无法识别 NULL?

输入数据和table内容

下面链接的电子表格包含我在本次测试中使用的示例数据,以及 运行 执行以下代码后数据库 table 的内容。以黄色突出显示的记录是 table.

中的重复记录

https://docs.google.com/spreadsheets/d/1dS75vmzzNAqGShqakRwN8FUkL7RugiTo/edit?usp=sharing&ouid=102857691407472073826&rtpof=true&sd=true


测试数据库

BEGIN TRANSACTION;
CREATE TABLE IF NOT EXISTS "Containers" (
    "ContainerID"   INTEGER NOT NULL UNIQUE,
    "ContainerName" TEXT,
    "Qty"   INTEGER,
    "Size"  INTEGER,
    "Unit"  TEXT,
    "ClientOwned"   TEXT,
    "VendorOwned"   TEXT,
    PRIMARY KEY("ContainerID"),
    UNIQUE("ContainerName","Qty","Size","Unit","ClientOwned","VendorOwned")
);
COMMIT;

测试Python模块

# Built-In
import datetime
import sys
from timeit import default_timer as timer

# Third-Party
import pandas as pd
import pprint
import sqlalchemy
from loguru import logger
from sqlalchemy import create_engine, MetaData, Table, select
from sqlalchemy.exc import IntegrityError

db_file = 'upsert_bug_test.db'
data_file = 'container_sample_data.xlsx'

pp = pprint.PrettyPrinter(indent=3)

class TestDatabase:
    def __init__(self):
        self.engine = create_engine(f'sqlite:///{db_file}')
        self.conn = self.engine.connect()
        self.metadata_obj = MetaData()

        self._reflect_db()

    def _reflect_db(self):
        """Get the schema for the tables and views that already exist in the database."""

        self.db_containers = Table('Containers', self.metadata_obj, autoload_with=self.engine)


# Dictionary to map the fields in the data file to fields in the database
# Mapping key = database table
# Mapping value = list of field dictionaries
#       Field key = name of field in report
#       Field value = name of field in database table
mapping = {
    'Containers': [
        {'Container': 'ContainerName'},
        {'Qty': 'Qty'}, 
        {'Size': 'Size'},
        {'Unit': 'Unit'}, 
        {'Owned by Client': 'ClientOwned'}, 
        {'Owned by Vendor': 'VendorOwned'},
    ],
}


def build_insert_dict(record: dict, section_mapping: list):
    '''Using the mapping dictionary, build and return a dictionary of parameters that will be
    passed to insert() and select().'''

    return_data = {}

    for pair in section_mapping:
        for report_field in pair:
            report_value = record.get(report_field)
            db_field = pair.get(report_field)
            return_data.update({db_field: report_value})

    return return_data


if __name__ == '__main__':
    # Start the timer
    start = timer()

    # Create an instance of the database object
    db = TestDatabase()

    # Load the data from the sample file
    logger.info('Loading records from sample file...')

    data_df = pd.read_excel(pd.ExcelFile(data_file), 'Sheet1')
    new_data_df = data_df.astype(object).where(data_df.notna(), None)
    data = new_data_df.to_dict('records')

    # Insert each record from the data file into the database. If a record already exists,
    # get the ID for that record and return it.
    for record in data:
        container_data = build_insert_dict(record, mapping.get('Containers'))
        try:
            ins = db.db_containers.insert().values(**container_data)
            res = db.conn.execute(ins)
            container_id = res.inserted_primary_key[0]
        except IntegrityError as err:
            logger.info(f'Record already exists in the database. Attempting to find the ID for that record...')
            sel = select(db.db_containers.c.ContainerID).filter_by(**container_data)
            res = db.conn.execute(sel)
            old_container_id = res.fetchone()
            if not old_container_id:
                logger.error('Failed to get previously inserted container id! Investigate')
                exit(-1)
            else:
                container_id = old_container_id[0]
                logger.info(f'Found ID {container_id}')

    stop = timer()
    done = stop - start
    migration_time = datetime.timedelta(seconds=done)

    logger.success(f'Inserted all unique records into the database in {migration_time}')

SELECT错误

(oaktier-env) PS C:\Oaktier\oaktier\tasks> python .\upsert_test.py
2022-04-21 11:27:26.670 | INFO     | __main__:<module>:76 - Loading records from sample file...
2022-04-21 11:27:27.205 | INFO     | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.210 | INFO     | __main__:<module>:100 - Found ID 1
2022-04-21 11:27:27.211 | INFO     | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.215 | INFO     | __main__:<module>:100 - Found ID 2
2022-04-21 11:27:27.216 | INFO     | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
Traceback (most recent call last):
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
sqlite3.IntegrityError: UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 88, in <module>
    res = db.conn.execute(ins)
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
    util.raise_(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
    raise exception
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
[SQL: INSERT INTO "Containers" ("ContainerName", "Qty", "Size", "Unit", "ClientOwned", "VendorOwned") VALUES (?, ?, ?, ?, ?, ?)]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/gkpj)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
sqlite3.InterfaceError: Error binding parameter 0 - probably unsupported type.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 93, in <module>
    res = db.conn.execute(sel)
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
    util.raise_(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
    raise exception
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.InterfaceError: (sqlite3.InterfaceError) Error binding parameter 0 - probably unsupported type.
[SQL: SELECT "Containers"."ContainerID"
FROM "Containers"
WHERE "Containers"."ContainerName" = ? AND "Containers"."Qty" = ? AND "Containers"."Size" = ? AND "Containers"."Unit" = ? AND "Containers"."ClientOwned" = ? AND "Containers"."VendorOwned" = ?]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/rvf5)

根据关于 UNIQUE 约束的 SQLite 文档:

For the purposes of UNIQUE constraints, NULL values are considered distinct from all other values, including other NULLs.

因此,不会忽略 NULL,而是将它们视为唯一实体。事实上,即使 Python 中的 None 和 Pandas/Numpy 中的 NaN 也不相等。因此,对于 SQLite,您使用 NULL 突出显示的行不是重复行,而是实际上唯一的行。对于空字符串,可能存在空格问题。

作为解决方法,请考虑在插入查询后填写 non-None 占位符或空字符串以替换为 NULL。避免将整个数据框转换为相同类型,但根据需要明确处理每一列。

with pd.ExcelFile(data_file) as wb:
    raw_df = pd.read_excel(wb, 'Sheet1') 

# CLEAN EACH STRING COLUMN
clean_df = (
    raw_df.assign(
        ContainerName = lambda x: x["ContainerName"].astype("string").str.strip(),
        Unit = lambda x: x["Unit"].astype("string").str.strip(),
        ClientOwned = lambda x: x["ClientOwned"].astype("string").str.strip(),
        VendorOwned = lambda x: x["VendorOwned"].astype("string").str.strip()
    ).where(data_df.notna(), "")
)

db_data = clean_df.to_dict('records')

将以下翻译成 SQLAlchemy 语法:

UPDATE Containers SET ContainerName = NULL WHERE ContainerName = '';
UPDATE Containers SET Unit = NULL WHERE Unit = '';
UPDATE Containers SET ClientOwned = NULL WHERE ClientOwned = '';
UPDATE Containers SET VendorOwned = NULL WHERE VendorOwned = '';