使用 SQLAlchemy 执行更新语句不会更新行,但语句在 MySQL Workbench 中执行时可以完美运行

Executing update statement with SQLAlchemy does not update rows, but statement works perfectly in when executed in MySQL Workbench

我有以下代码:

from sqlalchemy import create_engine

SCHEMA = 'dev_gba'
TABLE = 'dev_l1c_v2'
USER = 'db-user'
PASSWORD = '-'
ENDPOINT = '-.us-east-1.rds.amazonaws.com'
process_start = 'SOME_VAL'
process_end = 'SOME_VAL'
granule_id = 'A006202_20160829T191558'

engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()        
    cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID  = "{granule_id}"')
    cursor_obj.close()
finally:
    connection.close()

如果我select全部来自数据库,那么我可以看到未更新的行。但是,如果我像这样打印语句:

print(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"')

输出为:

UPDATE dev_gba.dev_l1c_v2 SET PROCESS_START_TIME = "SOME_VAL", PROCESS_END_TIME = "SOME_VAL" WHERE dev_gba.dev_l1c_v2.GRANULE_ID = "A006202_20160829T191558"

如果我将其复制并粘贴到 MySQL workbench,它将执行,并且我可以看到行已更新。

我在 workbench 中禁用了安全更新,并尝试在我的执行语句之前添加它作为

cursor_obj.execute('SET SQL_SAFE_UPDATES = 0')

但这也行不通。这是另一件令人困惑的事情,在我的代码前面我 运行 以下内容:

connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()
    cursor_obj.execute(f'CREATE TEMPORARY TABLE {TEMP_TABLE} SELECT {TABLE}.index FROM {SCHEMA}.{TABLE} WHERE IN_PROGRESS = 0 AND PROCESSED = 0 ORDER BY RAND() LIMIT {CPU_COUNT}')
    cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET IN_PROGRESS = 1, INSTANCE_ID = "{INSTANCE_ID}" WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
    cursor_obj.execute(f'SELECT BASE_URL FROM {SCHEMA}.{TABLE} WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
    result = cursor_obj.fetchall()
    cursor_obj.execute(f'DROP TABLE {TEMP_TABLE}')
    cursor_obj.close()
finally:
    connection.close()

这段代码中的更新语句工作正常,没有任何问题。我还尝试将 echo=True 添加到我的创建引擎行:

engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}", echo = True)

输出为:

2021-12-31 10:17:09,613 信息 sqlalchemy.engine.Engine 显示变量 'sql_mode'

2021-12-31 10:17:09,616 信息 sqlalchemy.engine.Engine [原始 sql] {}

2021-12-31 10:17:09,700 信息 sqlalchemy.engine.Engine 显示变量 'lower_case_table_names'

2021-12-31 10:17:09,701 信息 sqlalchemy.engine.Engine [在 0.00143 秒内生成] {}

2021-12-31 10:17:09,858 信息 sqlalchemy.engine.Engine SELECT 数据库 ()

2021-12-31 10:17:09,859 信息 sqlalchemy.engine.Engine [原始 sql] {}

这不是很有用。

我也试过:

from sqlalchemy.sql import text
cursor_obj.execute(text(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID  = "{granule_id}"'))

这会产生以下错误:

TypeError: 'TextClause' 类型的对象没有 len()

不太确定从这里到哪里去。

使用字符串格式在 Python 中创建 SQL 语句容易出错,如果有更好的工具可用,应避免使用。

您可以像这样使用 SQLAlchemy core 运行 原始查询,而不必下拉到原始连接:

import sqlalchemy as sa

engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")


# Reflect the database table into an object  
tbl = sa.Table(TABLE, sa.MetaData(), autoload_with=engine)
# Create an update object
upd = sa.update(tbl).where(tbl.c.GRANULE_ID == granule_id).values(PROCESS_START_TIME=process_start_time, PROCESS_END_TIME=process_end_time)

# The "begin" context manager will automatically commit on exit
with engine.begin() as conn:
    conn.execute(upd)

如果你需要使用raw SQL,你可以这样做(参见Using Textual SQL):

# We need to use string formatting to set the table; SQLAlchemy will automatically qualify it with the schema name.
stmt = f'UPDATE {TABLE} SET PROCESS_START_TIME = :process_start_time, PROCESS_END_TIME = :process_end_time WHERE {TABLE}.GRANULE_ID  = :granule_id'

values = {
    'process_start_time': process_start_time,
    'process_end_time': process_end_time,
    'granule_id': granule_id,
}

with engine.begin() as conn:
    conn.execute(sa.text(stmt), values)