如何使用 SQLAlchemy Core 在 SQLite3 数据库上正确执行 UPSERT?
How do I correctly perform an UPSERT on a SQLite3 DB using SQLAlchemy Core?
我有一个记录列表,其中很多是重复的,我正在尝试使用 SQLAlchemy Core 1.4.35 和 Python 3.9.2.
如果模块尝试插入 table 中尚不存在的记录,我希望插入成功,并且我希望 return 为创建的主键那条记录。
但是如果模块试图插入一条已经存在的记录,那么我希望插入失败。我想要 SELECT 和 return 现有记录的 ID。
当我 运行 下面的代码时,我 运行 遇到了两个问题,我无法弄清楚是什么导致了这两个问题。
如果我删除了 except IntegrityError as err
行下缩进的所有代码,那么模块会成功完成。但是数据库 table 有一堆重复的记录——尽管 table.
有 UNIQUE 约束
如果我保留该代码,并在处理异常时尝试 SELECT 现有记录的 ID - 我会收到以下错误。
这两个问题是什么引起的?
如何防止重复记录被插入,以及SELECT现有记录的ID?
我还在学习 SQL 所以我可能遗漏了一些明显的东西。
可能相关更新
我检查的所有重复记录都有一个共同点,就是其中一个字段包含一个NULL。 SQLite 在检查 UNIQUE table 约束时是否无法识别 NULL?
输入数据和table内容
下面链接的电子表格包含我在本次测试中使用的示例数据,以及 运行 执行以下代码后数据库 table 的内容。以黄色突出显示的记录是 table.
中的重复记录
测试数据库
BEGIN TRANSACTION;
CREATE TABLE IF NOT EXISTS "Containers" (
"ContainerID" INTEGER NOT NULL UNIQUE,
"ContainerName" TEXT,
"Qty" INTEGER,
"Size" INTEGER,
"Unit" TEXT,
"ClientOwned" TEXT,
"VendorOwned" TEXT,
PRIMARY KEY("ContainerID"),
UNIQUE("ContainerName","Qty","Size","Unit","ClientOwned","VendorOwned")
);
COMMIT;
测试Python模块
# Built-In
import datetime
import sys
from timeit import default_timer as timer
# Third-Party
import pandas as pd
import pprint
import sqlalchemy
from loguru import logger
from sqlalchemy import create_engine, MetaData, Table, select
from sqlalchemy.exc import IntegrityError
db_file = 'upsert_bug_test.db'
data_file = 'container_sample_data.xlsx'
pp = pprint.PrettyPrinter(indent=3)
class TestDatabase:
def __init__(self):
self.engine = create_engine(f'sqlite:///{db_file}')
self.conn = self.engine.connect()
self.metadata_obj = MetaData()
self._reflect_db()
def _reflect_db(self):
"""Get the schema for the tables and views that already exist in the database."""
self.db_containers = Table('Containers', self.metadata_obj, autoload_with=self.engine)
# Dictionary to map the fields in the data file to fields in the database
# Mapping key = database table
# Mapping value = list of field dictionaries
# Field key = name of field in report
# Field value = name of field in database table
mapping = {
'Containers': [
{'Container': 'ContainerName'},
{'Qty': 'Qty'},
{'Size': 'Size'},
{'Unit': 'Unit'},
{'Owned by Client': 'ClientOwned'},
{'Owned by Vendor': 'VendorOwned'},
],
}
def build_insert_dict(record: dict, section_mapping: list):
'''Using the mapping dictionary, build and return a dictionary of parameters that will be
passed to insert() and select().'''
return_data = {}
for pair in section_mapping:
for report_field in pair:
report_value = record.get(report_field)
db_field = pair.get(report_field)
return_data.update({db_field: report_value})
return return_data
if __name__ == '__main__':
# Start the timer
start = timer()
# Create an instance of the database object
db = TestDatabase()
# Load the data from the sample file
logger.info('Loading records from sample file...')
data_df = pd.read_excel(pd.ExcelFile(data_file), 'Sheet1')
new_data_df = data_df.astype(object).where(data_df.notna(), None)
data = new_data_df.to_dict('records')
# Insert each record from the data file into the database. If a record already exists,
# get the ID for that record and return it.
for record in data:
container_data = build_insert_dict(record, mapping.get('Containers'))
try:
ins = db.db_containers.insert().values(**container_data)
res = db.conn.execute(ins)
container_id = res.inserted_primary_key[0]
except IntegrityError as err:
logger.info(f'Record already exists in the database. Attempting to find the ID for that record...')
sel = select(db.db_containers.c.ContainerID).filter_by(**container_data)
res = db.conn.execute(sel)
old_container_id = res.fetchone()
if not old_container_id:
logger.error('Failed to get previously inserted container id! Investigate')
exit(-1)
else:
container_id = old_container_id[0]
logger.info(f'Found ID {container_id}')
stop = timer()
done = stop - start
migration_time = datetime.timedelta(seconds=done)
logger.success(f'Inserted all unique records into the database in {migration_time}')
SELECT错误
(oaktier-env) PS C:\Oaktier\oaktier\tasks> python .\upsert_test.py
2022-04-21 11:27:26.670 | INFO | __main__:<module>:76 - Loading records from sample file...
2022-04-21 11:27:27.205 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.210 | INFO | __main__:<module>:100 - Found ID 1
2022-04-21 11:27:27.211 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.215 | INFO | __main__:<module>:100 - Found ID 2
2022-04-21 11:27:27.216 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
Traceback (most recent call last):
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlite3.IntegrityError: UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 88, in <module>
res = db.conn.execute(ins)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
raise exception
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
[SQL: INSERT INTO "Containers" ("ContainerName", "Qty", "Size", "Unit", "ClientOwned", "VendorOwned") VALUES (?, ?, ?, ?, ?, ?)]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlite3.InterfaceError: Error binding parameter 0 - probably unsupported type.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 93, in <module>
res = db.conn.execute(sel)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
raise exception
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.InterfaceError: (sqlite3.InterfaceError) Error binding parameter 0 - probably unsupported type.
[SQL: SELECT "Containers"."ContainerID"
FROM "Containers"
WHERE "Containers"."ContainerName" = ? AND "Containers"."Qty" = ? AND "Containers"."Size" = ? AND "Containers"."Unit" = ? AND "Containers"."ClientOwned" = ? AND "Containers"."VendorOwned" = ?]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/rvf5)
根据关于 UNIQUE
约束的 SQLite 文档:
For the purposes of UNIQUE constraints, NULL values are considered distinct from all other values, including other NULLs.
因此,不会忽略 NULL,而是将它们视为唯一实体。事实上,即使 Python 中的 None
和 Pandas/Numpy 中的 NaN
也不相等。因此,对于 SQLite,您使用 NULL
突出显示的行不是重复行,而是实际上唯一的行。对于空字符串,可能存在空格问题。
作为解决方法,请考虑在插入查询后填写 non-None 占位符或空字符串以替换为 NULL
。避免将整个数据框转换为相同类型,但根据需要明确处理每一列。
with pd.ExcelFile(data_file) as wb:
raw_df = pd.read_excel(wb, 'Sheet1')
# CLEAN EACH STRING COLUMN
clean_df = (
raw_df.assign(
ContainerName = lambda x: x["ContainerName"].astype("string").str.strip(),
Unit = lambda x: x["Unit"].astype("string").str.strip(),
ClientOwned = lambda x: x["ClientOwned"].astype("string").str.strip(),
VendorOwned = lambda x: x["VendorOwned"].astype("string").str.strip()
).where(data_df.notna(), "")
)
db_data = clean_df.to_dict('records')
将以下翻译成 SQLAlchemy 语法:
UPDATE Containers SET ContainerName = NULL WHERE ContainerName = '';
UPDATE Containers SET Unit = NULL WHERE Unit = '';
UPDATE Containers SET ClientOwned = NULL WHERE ClientOwned = '';
UPDATE Containers SET VendorOwned = NULL WHERE VendorOwned = '';
我有一个记录列表,其中很多是重复的,我正在尝试使用 SQLAlchemy Core 1.4.35 和 Python 3.9.2.
如果模块尝试插入 table 中尚不存在的记录,我希望插入成功,并且我希望 return 为创建的主键那条记录。
但是如果模块试图插入一条已经存在的记录,那么我希望插入失败。我想要 SELECT 和 return 现有记录的 ID。
当我 运行 下面的代码时,我 运行 遇到了两个问题,我无法弄清楚是什么导致了这两个问题。
如果我删除了
有 UNIQUE 约束except IntegrityError as err
行下缩进的所有代码,那么模块会成功完成。但是数据库 table 有一堆重复的记录——尽管 table.如果我保留该代码,并在处理异常时尝试 SELECT 现有记录的 ID - 我会收到以下错误。
这两个问题是什么引起的?
如何防止重复记录被插入,以及SELECT现有记录的ID?
我还在学习 SQL 所以我可能遗漏了一些明显的东西。
可能相关更新
我检查的所有重复记录都有一个共同点,就是其中一个字段包含一个NULL。 SQLite 在检查 UNIQUE table 约束时是否无法识别 NULL?
输入数据和table内容
下面链接的电子表格包含我在本次测试中使用的示例数据,以及 运行 执行以下代码后数据库 table 的内容。以黄色突出显示的记录是 table.
中的重复记录测试数据库
BEGIN TRANSACTION;
CREATE TABLE IF NOT EXISTS "Containers" (
"ContainerID" INTEGER NOT NULL UNIQUE,
"ContainerName" TEXT,
"Qty" INTEGER,
"Size" INTEGER,
"Unit" TEXT,
"ClientOwned" TEXT,
"VendorOwned" TEXT,
PRIMARY KEY("ContainerID"),
UNIQUE("ContainerName","Qty","Size","Unit","ClientOwned","VendorOwned")
);
COMMIT;
测试Python模块
# Built-In
import datetime
import sys
from timeit import default_timer as timer
# Third-Party
import pandas as pd
import pprint
import sqlalchemy
from loguru import logger
from sqlalchemy import create_engine, MetaData, Table, select
from sqlalchemy.exc import IntegrityError
db_file = 'upsert_bug_test.db'
data_file = 'container_sample_data.xlsx'
pp = pprint.PrettyPrinter(indent=3)
class TestDatabase:
def __init__(self):
self.engine = create_engine(f'sqlite:///{db_file}')
self.conn = self.engine.connect()
self.metadata_obj = MetaData()
self._reflect_db()
def _reflect_db(self):
"""Get the schema for the tables and views that already exist in the database."""
self.db_containers = Table('Containers', self.metadata_obj, autoload_with=self.engine)
# Dictionary to map the fields in the data file to fields in the database
# Mapping key = database table
# Mapping value = list of field dictionaries
# Field key = name of field in report
# Field value = name of field in database table
mapping = {
'Containers': [
{'Container': 'ContainerName'},
{'Qty': 'Qty'},
{'Size': 'Size'},
{'Unit': 'Unit'},
{'Owned by Client': 'ClientOwned'},
{'Owned by Vendor': 'VendorOwned'},
],
}
def build_insert_dict(record: dict, section_mapping: list):
'''Using the mapping dictionary, build and return a dictionary of parameters that will be
passed to insert() and select().'''
return_data = {}
for pair in section_mapping:
for report_field in pair:
report_value = record.get(report_field)
db_field = pair.get(report_field)
return_data.update({db_field: report_value})
return return_data
if __name__ == '__main__':
# Start the timer
start = timer()
# Create an instance of the database object
db = TestDatabase()
# Load the data from the sample file
logger.info('Loading records from sample file...')
data_df = pd.read_excel(pd.ExcelFile(data_file), 'Sheet1')
new_data_df = data_df.astype(object).where(data_df.notna(), None)
data = new_data_df.to_dict('records')
# Insert each record from the data file into the database. If a record already exists,
# get the ID for that record and return it.
for record in data:
container_data = build_insert_dict(record, mapping.get('Containers'))
try:
ins = db.db_containers.insert().values(**container_data)
res = db.conn.execute(ins)
container_id = res.inserted_primary_key[0]
except IntegrityError as err:
logger.info(f'Record already exists in the database. Attempting to find the ID for that record...')
sel = select(db.db_containers.c.ContainerID).filter_by(**container_data)
res = db.conn.execute(sel)
old_container_id = res.fetchone()
if not old_container_id:
logger.error('Failed to get previously inserted container id! Investigate')
exit(-1)
else:
container_id = old_container_id[0]
logger.info(f'Found ID {container_id}')
stop = timer()
done = stop - start
migration_time = datetime.timedelta(seconds=done)
logger.success(f'Inserted all unique records into the database in {migration_time}')
SELECT错误
(oaktier-env) PS C:\Oaktier\oaktier\tasks> python .\upsert_test.py
2022-04-21 11:27:26.670 | INFO | __main__:<module>:76 - Loading records from sample file...
2022-04-21 11:27:27.205 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.210 | INFO | __main__:<module>:100 - Found ID 1
2022-04-21 11:27:27.211 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.215 | INFO | __main__:<module>:100 - Found ID 2
2022-04-21 11:27:27.216 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
Traceback (most recent call last):
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlite3.IntegrityError: UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 88, in <module>
res = db.conn.execute(ins)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
raise exception
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
[SQL: INSERT INTO "Containers" ("ContainerName", "Qty", "Size", "Unit", "ClientOwned", "VendorOwned") VALUES (?, ?, ?, ?, ?, ?)]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlite3.InterfaceError: Error binding parameter 0 - probably unsupported type.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 93, in <module>
res = db.conn.execute(sel)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
raise exception
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.InterfaceError: (sqlite3.InterfaceError) Error binding parameter 0 - probably unsupported type.
[SQL: SELECT "Containers"."ContainerID"
FROM "Containers"
WHERE "Containers"."ContainerName" = ? AND "Containers"."Qty" = ? AND "Containers"."Size" = ? AND "Containers"."Unit" = ? AND "Containers"."ClientOwned" = ? AND "Containers"."VendorOwned" = ?]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/rvf5)
根据关于 UNIQUE
约束的 SQLite 文档:
For the purposes of UNIQUE constraints, NULL values are considered distinct from all other values, including other NULLs.
因此,不会忽略 NULL,而是将它们视为唯一实体。事实上,即使 Python 中的 None
和 Pandas/Numpy 中的 NaN
也不相等。因此,对于 SQLite,您使用 NULL
突出显示的行不是重复行,而是实际上唯一的行。对于空字符串,可能存在空格问题。
作为解决方法,请考虑在插入查询后填写 non-None 占位符或空字符串以替换为 NULL
。避免将整个数据框转换为相同类型,但根据需要明确处理每一列。
with pd.ExcelFile(data_file) as wb:
raw_df = pd.read_excel(wb, 'Sheet1')
# CLEAN EACH STRING COLUMN
clean_df = (
raw_df.assign(
ContainerName = lambda x: x["ContainerName"].astype("string").str.strip(),
Unit = lambda x: x["Unit"].astype("string").str.strip(),
ClientOwned = lambda x: x["ClientOwned"].astype("string").str.strip(),
VendorOwned = lambda x: x["VendorOwned"].astype("string").str.strip()
).where(data_df.notna(), "")
)
db_data = clean_df.to_dict('records')
将以下翻译成 SQLAlchemy 语法:
UPDATE Containers SET ContainerName = NULL WHERE ContainerName = '';
UPDATE Containers SET Unit = NULL WHERE Unit = '';
UPDATE Containers SET ClientOwned = NULL WHERE ClientOwned = '';
UPDATE Containers SET VendorOwned = NULL WHERE VendorOwned = '';