如何在 mySQL 中插入以便它可以与 sqlite3 一起使用?
How to upsert in mySQL so it will work with sqlite3?
我需要测试一个 python flask 应用程序,它使用 mySQL 来 运行 它的查询使用 sqlalchemy 和 sqlite3。
我在尝试使用 ON DUPLICATE
子句测试更新插入函数时遇到异常:
(sqlite3.OperationalError) near "DUPLICATE": syntax error
经过简短的搜索解决方案后,我发现 sqlite 执行 upsert 查询的正确语法是 ON CONFLICT(id) DO UPDATE SET ...
,我试过了,但 mySQL 不认识这个语法。
我能做什么?我怎样才能做一个 upsert 查询,以便 sqlite3 和 mySQL 都能正确执行它?
示例:
员工table:
id
name
1
Jeff Bezos
2
Bill Gates
INSERT INTO employees(id,name)
VALUES(1, 'Donald Trump')
ON DUPLICATE KEY UPDATE name = VALUES(name);
应将 table 更新为:
id
name
1
Donald Trump
2
Bill Gates
提前致谢!
How can I do an upsert query so sqlite3 and mySQL will both execute it properly?
您可以通过尝试 UPDATE 获得相同的结果,如果没有找到匹配则执行 INSERT。以下代码使用 SQLAlchemy Core 结构,进一步保护 MySQL 和 SQLite 之间的细微差别。例如,如果您的 table 有一个名为“order”的列,那么 SQLAlchemy 将为 MySQL 发出此 DDL …
CREATE TABLE employees (
id INTEGER NOT NULL,
name VARCHAR(50),
`order` INTEGER,
PRIMARY KEY (id)
)
…和这个 SQLite 的 DDL
CREATE TABLE employees (
id INTEGER NOT NULL,
name VARCHAR(50),
"order" INTEGER,
PRIMARY KEY (id)
)
import logging
import sqlalchemy as sa
# pick one
connection_url = "mysql+mysqldb://scott:tiger@localhost:3307/mydb"
# connection_url = "sqlite://"
engine = sa.create_engine(connection_url)
def _dump_table():
with engine.begin() as conn:
print(conn.exec_driver_sql("SELECT * FROM employees").all())
def _setup_example():
employees = sa.Table(
"employees",
sa.MetaData(),
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
sa.Column("name", sa.String(50)),
)
employees.drop(engine, checkfirst=True)
employees.create(engine)
# create initial example data
with engine.begin() as conn:
conn.execute(
employees.insert(),
[{"id": 1, "name": "Jeff Bezos"}, {"id": 2, "name": "Bill Gates"}],
)
def upsert_employee(id_, name):
employees = sa.Table("employees", sa.MetaData(), autoload_with=engine)
with engine.begin() as conn:
result = conn.execute(
employees.update().where(employees.c.id == id_), {"name": name}
)
logging.debug(f" {result.rowcount} row(s) updated.")
if result.rowcount == 0:
result = conn.execute(
employees.insert(), {"id": id_, "name": name}
)
logging.debug(f" {result.rowcount} row(s) inserted.")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
_setup_example()
_dump_table()
"""
[(1, 'Jeff Bezos'), (2, 'Bill Gates')]
"""
upsert_employee(3, "Donald Trump")
"""
DEBUG:root: 0 row(s) updated.
DEBUG:root: 1 row(s) inserted.
"""
_dump_table()
"""
[(1, 'Jeff Bezos'), (2, 'Bill Gates'), (3, 'Donald Trump')]
"""
upsert_employee(1, "Elon Musk")
"""
DEBUG:root: 1 row(s) updated.
"""
_dump_table()
"""
[(1, 'Elon Musk'), (2, 'Bill Gates'), (3, 'Donald Trump')]
"""
我需要测试一个 python flask 应用程序,它使用 mySQL 来 运行 它的查询使用 sqlalchemy 和 sqlite3。
我在尝试使用 ON DUPLICATE
子句测试更新插入函数时遇到异常:
(sqlite3.OperationalError) near "DUPLICATE": syntax error
经过简短的搜索解决方案后,我发现 sqlite 执行 upsert 查询的正确语法是 ON CONFLICT(id) DO UPDATE SET ...
,我试过了,但 mySQL 不认识这个语法。
我能做什么?我怎样才能做一个 upsert 查询,以便 sqlite3 和 mySQL 都能正确执行它?
示例:
员工table:
id | name |
---|---|
1 | Jeff Bezos |
2 | Bill Gates |
INSERT INTO employees(id,name)
VALUES(1, 'Donald Trump')
ON DUPLICATE KEY UPDATE name = VALUES(name);
应将 table 更新为:
id | name |
---|---|
1 | Donald Trump |
2 | Bill Gates |
提前致谢!
How can I do an upsert query so sqlite3 and mySQL will both execute it properly?
您可以通过尝试 UPDATE 获得相同的结果,如果没有找到匹配则执行 INSERT。以下代码使用 SQLAlchemy Core 结构,进一步保护 MySQL 和 SQLite 之间的细微差别。例如,如果您的 table 有一个名为“order”的列,那么 SQLAlchemy 将为 MySQL 发出此 DDL …
CREATE TABLE employees (
id INTEGER NOT NULL,
name VARCHAR(50),
`order` INTEGER,
PRIMARY KEY (id)
)
…和这个 SQLite 的 DDL
CREATE TABLE employees (
id INTEGER NOT NULL,
name VARCHAR(50),
"order" INTEGER,
PRIMARY KEY (id)
)
import logging
import sqlalchemy as sa
# pick one
connection_url = "mysql+mysqldb://scott:tiger@localhost:3307/mydb"
# connection_url = "sqlite://"
engine = sa.create_engine(connection_url)
def _dump_table():
with engine.begin() as conn:
print(conn.exec_driver_sql("SELECT * FROM employees").all())
def _setup_example():
employees = sa.Table(
"employees",
sa.MetaData(),
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
sa.Column("name", sa.String(50)),
)
employees.drop(engine, checkfirst=True)
employees.create(engine)
# create initial example data
with engine.begin() as conn:
conn.execute(
employees.insert(),
[{"id": 1, "name": "Jeff Bezos"}, {"id": 2, "name": "Bill Gates"}],
)
def upsert_employee(id_, name):
employees = sa.Table("employees", sa.MetaData(), autoload_with=engine)
with engine.begin() as conn:
result = conn.execute(
employees.update().where(employees.c.id == id_), {"name": name}
)
logging.debug(f" {result.rowcount} row(s) updated.")
if result.rowcount == 0:
result = conn.execute(
employees.insert(), {"id": id_, "name": name}
)
logging.debug(f" {result.rowcount} row(s) inserted.")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
_setup_example()
_dump_table()
"""
[(1, 'Jeff Bezos'), (2, 'Bill Gates')]
"""
upsert_employee(3, "Donald Trump")
"""
DEBUG:root: 0 row(s) updated.
DEBUG:root: 1 row(s) inserted.
"""
_dump_table()
"""
[(1, 'Jeff Bezos'), (2, 'Bill Gates'), (3, 'Donald Trump')]
"""
upsert_employee(1, "Elon Musk")
"""
DEBUG:root: 1 row(s) updated.
"""
_dump_table()
"""
[(1, 'Elon Musk'), (2, 'Bill Gates'), (3, 'Donald Trump')]
"""