使用 SQLAlchemy Postgres 批量更新插入
Bulk Upsert with SQLAlchemy Postgres
我正在按照 SQLAlchemy 文档 here 使用 Postgres 编写批量更新插入语句。出于演示目的,我有一个简单的 table MyTable
:
class MyTable(base):
__tablename__ = 'mytable'
id = Column(types.Integer, primary_key=True)
test_value = Column(types.Text)
创建通用插入语句非常简单:
from sqlalchemy.dialects import postgresql
values = [{'id': 0, 'test_value': 'a'}, {'id': 1, 'test_value': 'b'}]
insert_stmt = postgresql.insert(MyTable.__table__).values(values)
我 运行 遇到的问题是当我尝试添加更新插入的 "on conflict" 部分时。
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_=dict(data=values)
)
尝试执行此语句会产生 ProgrammingError
:
from sqlalchemy import create_engine
engine = create_engine('postgres://localhost/db_name')
engine.execute(update_stmt)
>>> ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict'
我认为我的误解在于用on_conflict_do_update
方法构造语句。有谁知道如何构造这个语句?我查看了 Whosebug 上的其他问题(例如 ),但我似乎无法解决上述错误。
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_=dict(data=values)
)
index_elements 应该是字符串列表或列对象列表。所以 [MyTable.id]
或 ['id']
(这是正确的)
set_ 应该是一个字典,其中列名作为键,有效的 sql 更新对象作为值。您可以使用 excluded
属性从插入块中引用值。因此,为了获得您希望的结果,您需要 set_={'test_value': insert_stmt.excluded.test_value}
(您犯的错误是示例中的 data=
不是一个神奇的论点......它是列的名称他们的例子 table)
所以,整个事情就是
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_={'test_value': insert_stmt.excluded.test_value}
)
当然,在现实世界的例子中,我通常想要更改不止一列。在那种情况下,我会做类似...
update_columns = {col.name: col for col in insert_stmt.excluded if col.name not in ('id', 'datetime_created')}
update_statement = insert_stmt.on_conflict_do_update(index_elements=['id'], set_=update_columns)
(此示例将覆盖除 id 和 datetime_created 列之外的所有列)
我正在按照 SQLAlchemy 文档 here 使用 Postgres 编写批量更新插入语句。出于演示目的,我有一个简单的 table MyTable
:
class MyTable(base):
__tablename__ = 'mytable'
id = Column(types.Integer, primary_key=True)
test_value = Column(types.Text)
创建通用插入语句非常简单:
from sqlalchemy.dialects import postgresql
values = [{'id': 0, 'test_value': 'a'}, {'id': 1, 'test_value': 'b'}]
insert_stmt = postgresql.insert(MyTable.__table__).values(values)
我 运行 遇到的问题是当我尝试添加更新插入的 "on conflict" 部分时。
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_=dict(data=values)
)
尝试执行此语句会产生 ProgrammingError
:
from sqlalchemy import create_engine
engine = create_engine('postgres://localhost/db_name')
engine.execute(update_stmt)
>>> ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict'
我认为我的误解在于用on_conflict_do_update
方法构造语句。有谁知道如何构造这个语句?我查看了 Whosebug 上的其他问题(例如
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_=dict(data=values)
)
index_elements 应该是字符串列表或列对象列表。所以 [MyTable.id]
或 ['id']
(这是正确的)
set_ 应该是一个字典,其中列名作为键,有效的 sql 更新对象作为值。您可以使用 excluded
属性从插入块中引用值。因此,为了获得您希望的结果,您需要 set_={'test_value': insert_stmt.excluded.test_value}
(您犯的错误是示例中的 data=
不是一个神奇的论点......它是列的名称他们的例子 table)
所以,整个事情就是
update_stmt = insert_stmt.on_conflict_do_update(
index_elements=[MyTable.id],
set_={'test_value': insert_stmt.excluded.test_value}
)
当然,在现实世界的例子中,我通常想要更改不止一列。在那种情况下,我会做类似...
update_columns = {col.name: col for col in insert_stmt.excluded if col.name not in ('id', 'datetime_created')}
update_statement = insert_stmt.on_conflict_do_update(index_elements=['id'], set_=update_columns)
(此示例将覆盖除 id 和 datetime_created 列之外的所有列)