使用 SQLAlchemy 执行更新语句不会更新行,但语句在 MySQL Workbench 中执行时可以完美运行
Executing update statement with SQLAlchemy does not update rows, but statement works perfectly in when executed in MySQL Workbench
我有以下代码:
from sqlalchemy import create_engine
SCHEMA = 'dev_gba'
TABLE = 'dev_l1c_v2'
USER = 'db-user'
PASSWORD = '-'
ENDPOINT = '-.us-east-1.rds.amazonaws.com'
process_start = 'SOME_VAL'
process_end = 'SOME_VAL'
granule_id = 'A006202_20160829T191558'
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
connection = engine.raw_connection()
try:
cursor_obj = connection.cursor()
cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"')
cursor_obj.close()
finally:
connection.close()
如果我select全部来自数据库,那么我可以看到未更新的行。但是,如果我像这样打印语句:
print(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"')
输出为:
UPDATE dev_gba.dev_l1c_v2 SET PROCESS_START_TIME = "SOME_VAL", PROCESS_END_TIME = "SOME_VAL" WHERE dev_gba.dev_l1c_v2.GRANULE_ID = "A006202_20160829T191558"
如果我将其复制并粘贴到 MySQL workbench,它将执行,并且我可以看到行已更新。
我在 workbench 中禁用了安全更新,并尝试在我的执行语句之前添加它作为
cursor_obj.execute('SET SQL_SAFE_UPDATES = 0')
但这也行不通。这是另一件令人困惑的事情,在我的代码前面我 运行 以下内容:
connection = engine.raw_connection()
try:
cursor_obj = connection.cursor()
cursor_obj.execute(f'CREATE TEMPORARY TABLE {TEMP_TABLE} SELECT {TABLE}.index FROM {SCHEMA}.{TABLE} WHERE IN_PROGRESS = 0 AND PROCESSED = 0 ORDER BY RAND() LIMIT {CPU_COUNT}')
cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET IN_PROGRESS = 1, INSTANCE_ID = "{INSTANCE_ID}" WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
cursor_obj.execute(f'SELECT BASE_URL FROM {SCHEMA}.{TABLE} WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
result = cursor_obj.fetchall()
cursor_obj.execute(f'DROP TABLE {TEMP_TABLE}')
cursor_obj.close()
finally:
connection.close()
这段代码中的更新语句工作正常,没有任何问题。我还尝试将 echo=True 添加到我的创建引擎行:
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}", echo = True)
输出为:
2021-12-31 10:17:09,613 信息 sqlalchemy.engine.Engine 显示变量 'sql_mode'
2021-12-31 10:17:09,616 信息 sqlalchemy.engine.Engine [原始 sql] {}
2021-12-31 10:17:09,700 信息 sqlalchemy.engine.Engine 显示变量 'lower_case_table_names'
2021-12-31 10:17:09,701 信息 sqlalchemy.engine.Engine [在 0.00143 秒内生成] {}
2021-12-31 10:17:09,858 信息 sqlalchemy.engine.Engine SELECT 数据库 ()
2021-12-31 10:17:09,859 信息 sqlalchemy.engine.Engine [原始 sql] {}
这不是很有用。
我也试过:
from sqlalchemy.sql import text
cursor_obj.execute(text(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"'))
这会产生以下错误:
TypeError: 'TextClause' 类型的对象没有 len()
不太确定从这里到哪里去。
使用字符串格式在 Python 中创建 SQL 语句容易出错,如果有更好的工具可用,应避免使用。
您可以像这样使用 SQLAlchemy core 运行 原始查询,而不必下拉到原始连接:
import sqlalchemy as sa
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
# Reflect the database table into an object
tbl = sa.Table(TABLE, sa.MetaData(), autoload_with=engine)
# Create an update object
upd = sa.update(tbl).where(tbl.c.GRANULE_ID == granule_id).values(PROCESS_START_TIME=process_start_time, PROCESS_END_TIME=process_end_time)
# The "begin" context manager will automatically commit on exit
with engine.begin() as conn:
conn.execute(upd)
如果你需要使用raw SQL,你可以这样做(参见Using Textual SQL):
# We need to use string formatting to set the table; SQLAlchemy will automatically qualify it with the schema name.
stmt = f'UPDATE {TABLE} SET PROCESS_START_TIME = :process_start_time, PROCESS_END_TIME = :process_end_time WHERE {TABLE}.GRANULE_ID = :granule_id'
values = {
'process_start_time': process_start_time,
'process_end_time': process_end_time,
'granule_id': granule_id,
}
with engine.begin() as conn:
conn.execute(sa.text(stmt), values)
我有以下代码:
from sqlalchemy import create_engine
SCHEMA = 'dev_gba'
TABLE = 'dev_l1c_v2'
USER = 'db-user'
PASSWORD = '-'
ENDPOINT = '-.us-east-1.rds.amazonaws.com'
process_start = 'SOME_VAL'
process_end = 'SOME_VAL'
granule_id = 'A006202_20160829T191558'
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
connection = engine.raw_connection()
try:
cursor_obj = connection.cursor()
cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"')
cursor_obj.close()
finally:
connection.close()
如果我select全部来自数据库,那么我可以看到未更新的行。但是,如果我像这样打印语句:
print(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"')
输出为:
UPDATE dev_gba.dev_l1c_v2 SET PROCESS_START_TIME = "SOME_VAL", PROCESS_END_TIME = "SOME_VAL" WHERE dev_gba.dev_l1c_v2.GRANULE_ID = "A006202_20160829T191558"
如果我将其复制并粘贴到 MySQL workbench,它将执行,并且我可以看到行已更新。
我在 workbench 中禁用了安全更新,并尝试在我的执行语句之前添加它作为
cursor_obj.execute('SET SQL_SAFE_UPDATES = 0')
但这也行不通。这是另一件令人困惑的事情,在我的代码前面我 运行 以下内容:
connection = engine.raw_connection()
try:
cursor_obj = connection.cursor()
cursor_obj.execute(f'CREATE TEMPORARY TABLE {TEMP_TABLE} SELECT {TABLE}.index FROM {SCHEMA}.{TABLE} WHERE IN_PROGRESS = 0 AND PROCESSED = 0 ORDER BY RAND() LIMIT {CPU_COUNT}')
cursor_obj.execute(f'UPDATE {SCHEMA}.{TABLE} SET IN_PROGRESS = 1, INSTANCE_ID = "{INSTANCE_ID}" WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
cursor_obj.execute(f'SELECT BASE_URL FROM {SCHEMA}.{TABLE} WHERE {SCHEMA}.{TABLE}.index IN (SELECT {TEMP_TABLE}.index FROM {TEMP_TABLE})')
result = cursor_obj.fetchall()
cursor_obj.execute(f'DROP TABLE {TEMP_TABLE}')
cursor_obj.close()
finally:
connection.close()
这段代码中的更新语句工作正常,没有任何问题。我还尝试将 echo=True 添加到我的创建引擎行:
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}", echo = True)
输出为:
2021-12-31 10:17:09,613 信息 sqlalchemy.engine.Engine 显示变量 'sql_mode'
2021-12-31 10:17:09,616 信息 sqlalchemy.engine.Engine [原始 sql] {}
2021-12-31 10:17:09,700 信息 sqlalchemy.engine.Engine 显示变量 'lower_case_table_names'
2021-12-31 10:17:09,701 信息 sqlalchemy.engine.Engine [在 0.00143 秒内生成] {}
2021-12-31 10:17:09,858 信息 sqlalchemy.engine.Engine SELECT 数据库 ()
2021-12-31 10:17:09,859 信息 sqlalchemy.engine.Engine [原始 sql] {}
这不是很有用。
我也试过:
from sqlalchemy.sql import text
cursor_obj.execute(text(f'UPDATE {SCHEMA}.{TABLE} SET PROCESS_START_TIME = "{process_start}", PROCESS_END_TIME = "{process_end}" WHERE {SCHEMA}.{TABLE}.GRANULE_ID = "{granule_id}"'))
这会产生以下错误:
TypeError: 'TextClause' 类型的对象没有 len()
不太确定从这里到哪里去。
使用字符串格式在 Python 中创建 SQL 语句容易出错,如果有更好的工具可用,应避免使用。
您可以像这样使用 SQLAlchemy core 运行 原始查询,而不必下拉到原始连接:
import sqlalchemy as sa
engine = create_engine(f"mysql+pymysql://{USER}:{PASSWORD}@{ENDPOINT}/{SCHEMA}")
# Reflect the database table into an object
tbl = sa.Table(TABLE, sa.MetaData(), autoload_with=engine)
# Create an update object
upd = sa.update(tbl).where(tbl.c.GRANULE_ID == granule_id).values(PROCESS_START_TIME=process_start_time, PROCESS_END_TIME=process_end_time)
# The "begin" context manager will automatically commit on exit
with engine.begin() as conn:
conn.execute(upd)
如果你需要使用raw SQL,你可以这样做(参见Using Textual SQL):
# We need to use string formatting to set the table; SQLAlchemy will automatically qualify it with the schema name.
stmt = f'UPDATE {TABLE} SET PROCESS_START_TIME = :process_start_time, PROCESS_END_TIME = :process_end_time WHERE {TABLE}.GRANULE_ID = :granule_id'
values = {
'process_start_time': process_start_time,
'process_end_time': process_end_time,
'granule_id': granule_id,
}
with engine.begin() as conn:
conn.execute(sa.text(stmt), values)