SQLAlchemy Core - python 字典列表的高效 UPSERT Mysql

SQLAlchemy Core - Efficient UPSERT of a python list of dictionaries with Mysql

假设现有 mysql table,“用户”。
假设单列主键“id”。
假设要插入的数据总是以字典列表的形式给出,形式为:
[{'column_name1':'valueA', 'column_name2':'valueB'}, {'column_name1':'valueC', 'column_name2':'valueD'}].

如果插入的行具有相同的主键(也称为 ID),我只想更新所有其他列的值。

data1 = [{'id': 1, 'name': 'flo', 'role': 'admin', 'number': 121, 'text': 'text1'}, {'id': 2, 'name': 'foo', 'role': 'user', 'number': 567, 'text': 'text2'}, {'id': 3, 'name': 'banana', 'role': 'user', 'number': 890, 'text': 'text3'}]
data2 = [{'id': 3, 'name': 'bar', 'role': 'user', 'number': 56777, 'text': 'text4'}, {'id': 4, 'name': 'james', 'role': 'user', 'number': 999890, 'text': 'text5'}]

from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table

engine = create_engine(connectionString)
metadata = MetaData(engine)
table = Table('user', metadata, autoload=True)
#assuming user table is empty
engine.execute(table.insert(), data1)
bulk_insert = prepare_bulk_upsert_statement(data2)
engine.execute(bulk_insert)

我知道 SQLAlchemy 确实有一个 on_duplicate_key_update 方法我可以在 sqlalchemy.dialects.mysql.insert 中使用。但是从这个例子中,我无法弄清楚我的 prepare_bulk_upsert_statement 函数会是什么样子。最终,用户 table 的内容应该是这样的:

query = table.select()  
print ([r._asdict() for r in engine.execute(query)])

>
[{'id': 1, 'name': 'flo', 'role': 'admin', 'number': 121, 'text': 'text1'},  
{'id': 2, 'name': 'foo', 'role': 'user', 'number': 567, 'text': 'text2'},
{'id': 3, 'name': 'bar', 'role': 'user', 'number': 56777, 'text': 'text4'},
{'id': 4, 'name': 'james', 'role': 'user', 'number': 999890, 'text': 'text5'}]

查看 SQLAlchemy 示例:

from sqlalchemy.dialects.mysql import insert
insert_stmt = insert(my_table).values(
     id='some_existing_id',
     data='inserted value')

on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
    data=insert_stmt.inserted.data,
    status='U'
)

似乎on_duplicate_key_update 只能处理一行(又名一本字典)。有没有一种有效的方法来使用这种方法进行更新?或者有更好的方法吗?

我选择了这个:

        insert_stmt = insert(table).values(data2)
        primKeyColNames = [pk_column.name for pk_column in table.primary_key.columns.values()]
        updatedColNames = [column.name for column in table.columns if column.name not in primKeyColNames]
        onDuplicate = {colName:getattr(insert_stmt.inserted, colName) for colName in updatedColNames}
        on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(onDuplicate)
        engine.execute(on_duplicate_key_stmt)

获取主键(可以是多个列),将它们从列列表中删除,使用该列表为 on_duplicate_key_update 创建字典并传递它执行.