Python sqlalchemy 和 mySQL 存储过程总是 returns 0(仅输出参数)
Python sqlalchemy and mySQL stored procedure always returns 0 (out param only)
我正在尝试从 MySQL 存储过程中获取 ROW_COUNT()
到 python。
这是我得到的,但我不知道我错过了什么。
DELIMITER //
CREATE OR REPLACE PROCEDURE sp_refresh_mytable(
OUT row_count INT
)
BEGIN
DECLARE exit handler for SQLEXCEPTION
BEGIN
ROLLBACK;
END;
DECLARE exit handler for SQLWARNING
BEGIN
ROLLBACK;
END;
DECLARE exit handler FOR NOT FOUND
BEGIN
ROLLBACK;
END;
START TRANSACTION;
DELETE FROM mytable;
INSERT INTO mytable
(
col1
, col2
)
SELECT
col1
, col2
FROM othertable
;
SET row_count = ROW_COUNT();
COMMIT;
END //
DELIMITER ;
如果我像下面这样通过正常 SQL 调用它,我会得到正确的 row_count 插入操作(例如插入 26 行):
CALL sp_refresh_mytable(@rowcount);
select @rowcount as t;
-- output: 26
然后在python/mysqlalchemy:
def call_procedure(engine, function_name, params=None):
connection = engine.raw_connection()
try:
cursor = connection.cursor()
result = cursor.callproc('sp_refresh_mytable', [0])
## try result outputs
resultfetch = cursor.fetchone()
logger.info(result)
logger.info(result[0])
logger.info(resultfetch)
cursor.close()
connection.commit()
connection.close()
logger.info(f"Running procedure {function_name} success!")
return result
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
return None
finally:
connection.close()
所以我尝试记录获取输出值的不同变体,但它始终为 0 或 None。
[INFO] db_update [0]
[INFO] db_update 0
[INFO] db_update None
我错过了什么?
谢谢!
在 this answer 的帮助下,我找到了以下适合我的解决方案。
a) 使用 engine.raw_connection()
和 cursor.callproc
的工作解决方案:
def call_procedure(engine, function_name):
connection = engine.raw_connection()
try:
cursor = connection.cursor()
cursor.callproc(function_name, [0])
cursor.execute(f"""SELECT @_{function_name}_0""")
results = cursor.fetchone() ## returns a tuple e.g. (285,)
rows_affected = results[0]
cursor.close()
connection.commit()
logger.info(f"Running procedure {function_name} success!")
return rows_affected
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
return None
finally:
connection.close()
并且 this answer 我也找到了这个解决方案:
b) 代替使用原始连接,这同样有效:
def call_procedure(engine, function_name, params=None):
try:
with engine.begin() as db_conn:
db_conn.execute(f"""CALL {function_name}(@out)""")
results = db_conn.execute('SELECT @out').fetchone() ## returns a tuple e.g. (285,)
rows_affected = results[0]
logger.debug(f"Running procedure {function_name} success!")
return rows_affected
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
return None
finally:
if db_conn: db_conn.close()
如果使用其中一种方法相对于另一种方法有任何优点或缺点,请在评论中告诉我。
我只是想添加另一段代码,因为我试图让 callproc
与 multiple in- 和 out-params[=27 一起工作(使用 sqlalchemy) =].
对于这种情况,我在之前的回答中使用了使用原始连接 [解决方案 b)] 的 callproc
方法,因为此函数接受参数作为列表。
它可能在某些部分做得更优雅或更 pythonic,但它主要是为了让它工作,我可能会从中创建一个函数,这样我就可以用它来一般地调用具有多个的 SP和输出参数。
我在下面的代码中添加了注释,以便更容易理解正在发生的事情。
在我的例子中,我决定将 out-params 放在一个字典中,这样我就可以将它传递给调用应用程序,以防我需要对结果做出反应。当然你也可以包括 in-params 这可能对错误记录有意义。
## some in params
function_name = 'sp_upsert'
in_param1 = 'my param 1'
in_param2 = 'abcdefg'
in_param3 = 'some-name'
in_param4 = 'some display name'
in_params = [in_param1, in_param1, in_param1, in_param1]
## out params
out_params = [
'out1_row_count'
,'out2_row_count'
,'out3_row_count'
,'out4_row_count_ins'
,'out5_row_count_upd'
]
params = copy(in_params)
## adding the outparams as integers from out_params indices
params.extend([i for i, x in enumerate(out_params)])
## the params list will look like
## ['my param 1', 'abcdefg', 'some-name', 'some display name', 0, 1, 2, 3, 4]
logger.info(params)
## build query to get results from callproc (including in and out params)
res_qry_params = []
for i in range(len(params)):
res_qry_params.append(f"@_{function_name}_{i}")
res_qry = f"SELECT {', '.join(res_qry_params)}"
## the query to fetch the results (in and out params) will look like
## SELECT @_sp_upsert_0, @_sp_upsert_1, @_sp_upsert_2, @_sp_upsert_3, @_sp_upsert_4, @_sp_upsert_5, @_sp_upsert_6, @_sp_upsert_7, @_sp_upsert_8
logger.info(res_qry)
try:
connection = engine.raw_connection()
## calling the sp
cursor = connection.cursor()
cursor.callproc(function_name, params)
## get the results (includes in and out params), the 0/1 in the end are the row_counts from the sp
## fetchone is enough since all results come as on result record like
## ('my param 1', 'abcdefg', 'some-name', 'some display name', 1, 0, 1, 1, 0)
cursor.execute(res_qry)
results = cursor.fetchone()
logger.info(results)
## adding just the out params to a dict
res_dict = {}
for i, element in enumerate(out_params):
res_dict.update({
element: results[i + len(in_params)]
})
## the result dict in this case only contains the out param results and will look like
## { 'out1_row_count': 1,
## 'out2_row_count': 0,
## 'out3_row_count': 1,
## 'out4_row_count_ins': 1,
## 'out5_row_count_upd': 0}
logger.info(pformat(res_dict, indent=2, sort_dicts=False))
cursor.close()
connection.commit()
logger.debug(f"Running procedure {function_name} success!")
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
为了完成图片,这里是我的存储过程的简化版本。在 BEGIN 之后,我声明了一些错误处理程序,我将输出参数设置为默认值 0,否则它们也可以 return as NULL/None 如果过程未设置(例如,因为没有插入):
DELIMITER //
CREATE OR REPLACE PROCEDURE sp_upsert(
IN in_param1 VARCHAR(32),
IN in_param2 VARCHAR(250),
IN in_param3 VARCHAR(250),
IN in_param4 VARCHAR(250),
OUT out1_row_count INTEGER,
OUT out2_row_count INTEGER,
OUT out3_row_count INTEGER,
OUT out4_row_count_ins INTEGER,
OUT out5_row_count_upd INTEGER
)
BEGIN
-- declare variables, do NOT declare the out params here!
DECLARE dummy INTEGER DEFAULT 0;
-- declare error handlers (e.g. continue handler for not found)
DECLARE CONTINUE HANDLER FOR NOT FOUND SET dummy = 1;
-- set out params defaulting to 0
SET out1_row_count = 0;
SET out2_row_count = 0;
SET out3_row_count = 0;
SET out4_row_count_ins = 0;
SET out5_row_count_upd = 0;
-- do inserts and updates and set the outparam variables accordingly
INSERT INTO some_table ...;
SET out1_row_count = ROW_COUNT();
-- commit if no errors
COMMIT;
END //
DELIMITER ;
我正在尝试从 MySQL 存储过程中获取 ROW_COUNT()
到 python。
这是我得到的,但我不知道我错过了什么。
DELIMITER //
CREATE OR REPLACE PROCEDURE sp_refresh_mytable(
OUT row_count INT
)
BEGIN
DECLARE exit handler for SQLEXCEPTION
BEGIN
ROLLBACK;
END;
DECLARE exit handler for SQLWARNING
BEGIN
ROLLBACK;
END;
DECLARE exit handler FOR NOT FOUND
BEGIN
ROLLBACK;
END;
START TRANSACTION;
DELETE FROM mytable;
INSERT INTO mytable
(
col1
, col2
)
SELECT
col1
, col2
FROM othertable
;
SET row_count = ROW_COUNT();
COMMIT;
END //
DELIMITER ;
如果我像下面这样通过正常 SQL 调用它,我会得到正确的 row_count 插入操作(例如插入 26 行):
CALL sp_refresh_mytable(@rowcount);
select @rowcount as t;
-- output: 26
然后在python/mysqlalchemy:
def call_procedure(engine, function_name, params=None):
connection = engine.raw_connection()
try:
cursor = connection.cursor()
result = cursor.callproc('sp_refresh_mytable', [0])
## try result outputs
resultfetch = cursor.fetchone()
logger.info(result)
logger.info(result[0])
logger.info(resultfetch)
cursor.close()
connection.commit()
connection.close()
logger.info(f"Running procedure {function_name} success!")
return result
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
return None
finally:
connection.close()
所以我尝试记录获取输出值的不同变体,但它始终为 0 或 None。
[INFO] db_update [0]
[INFO] db_update 0
[INFO] db_update None
我错过了什么?
谢谢!
在 this answer 的帮助下,我找到了以下适合我的解决方案。
a) 使用 engine.raw_connection()
和 cursor.callproc
的工作解决方案:
def call_procedure(engine, function_name):
connection = engine.raw_connection()
try:
cursor = connection.cursor()
cursor.callproc(function_name, [0])
cursor.execute(f"""SELECT @_{function_name}_0""")
results = cursor.fetchone() ## returns a tuple e.g. (285,)
rows_affected = results[0]
cursor.close()
connection.commit()
logger.info(f"Running procedure {function_name} success!")
return rows_affected
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
return None
finally:
connection.close()
并且 this answer 我也找到了这个解决方案:
b) 代替使用原始连接,这同样有效:
def call_procedure(engine, function_name, params=None):
try:
with engine.begin() as db_conn:
db_conn.execute(f"""CALL {function_name}(@out)""")
results = db_conn.execute('SELECT @out').fetchone() ## returns a tuple e.g. (285,)
rows_affected = results[0]
logger.debug(f"Running procedure {function_name} success!")
return rows_affected
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
return None
finally:
if db_conn: db_conn.close()
如果使用其中一种方法相对于另一种方法有任何优点或缺点,请在评论中告诉我。
我只是想添加另一段代码,因为我试图让 callproc
与 multiple in- 和 out-params[=27 一起工作(使用 sqlalchemy) =].
对于这种情况,我在之前的回答中使用了使用原始连接 [解决方案 b)] 的 callproc
方法,因为此函数接受参数作为列表。
它可能在某些部分做得更优雅或更 pythonic,但它主要是为了让它工作,我可能会从中创建一个函数,这样我就可以用它来一般地调用具有多个的 SP和输出参数。
我在下面的代码中添加了注释,以便更容易理解正在发生的事情。
在我的例子中,我决定将 out-params 放在一个字典中,这样我就可以将它传递给调用应用程序,以防我需要对结果做出反应。当然你也可以包括 in-params 这可能对错误记录有意义。
## some in params
function_name = 'sp_upsert'
in_param1 = 'my param 1'
in_param2 = 'abcdefg'
in_param3 = 'some-name'
in_param4 = 'some display name'
in_params = [in_param1, in_param1, in_param1, in_param1]
## out params
out_params = [
'out1_row_count'
,'out2_row_count'
,'out3_row_count'
,'out4_row_count_ins'
,'out5_row_count_upd'
]
params = copy(in_params)
## adding the outparams as integers from out_params indices
params.extend([i for i, x in enumerate(out_params)])
## the params list will look like
## ['my param 1', 'abcdefg', 'some-name', 'some display name', 0, 1, 2, 3, 4]
logger.info(params)
## build query to get results from callproc (including in and out params)
res_qry_params = []
for i in range(len(params)):
res_qry_params.append(f"@_{function_name}_{i}")
res_qry = f"SELECT {', '.join(res_qry_params)}"
## the query to fetch the results (in and out params) will look like
## SELECT @_sp_upsert_0, @_sp_upsert_1, @_sp_upsert_2, @_sp_upsert_3, @_sp_upsert_4, @_sp_upsert_5, @_sp_upsert_6, @_sp_upsert_7, @_sp_upsert_8
logger.info(res_qry)
try:
connection = engine.raw_connection()
## calling the sp
cursor = connection.cursor()
cursor.callproc(function_name, params)
## get the results (includes in and out params), the 0/1 in the end are the row_counts from the sp
## fetchone is enough since all results come as on result record like
## ('my param 1', 'abcdefg', 'some-name', 'some display name', 1, 0, 1, 1, 0)
cursor.execute(res_qry)
results = cursor.fetchone()
logger.info(results)
## adding just the out params to a dict
res_dict = {}
for i, element in enumerate(out_params):
res_dict.update({
element: results[i + len(in_params)]
})
## the result dict in this case only contains the out param results and will look like
## { 'out1_row_count': 1,
## 'out2_row_count': 0,
## 'out3_row_count': 1,
## 'out4_row_count_ins': 1,
## 'out5_row_count_upd': 0}
logger.info(pformat(res_dict, indent=2, sort_dicts=False))
cursor.close()
connection.commit()
logger.debug(f"Running procedure {function_name} success!")
except Exception as e:
logger.error(f"Running procedure {function_name} failed!")
logger.exception(e)
为了完成图片,这里是我的存储过程的简化版本。在 BEGIN 之后,我声明了一些错误处理程序,我将输出参数设置为默认值 0,否则它们也可以 return as NULL/None 如果过程未设置(例如,因为没有插入):
DELIMITER //
CREATE OR REPLACE PROCEDURE sp_upsert(
IN in_param1 VARCHAR(32),
IN in_param2 VARCHAR(250),
IN in_param3 VARCHAR(250),
IN in_param4 VARCHAR(250),
OUT out1_row_count INTEGER,
OUT out2_row_count INTEGER,
OUT out3_row_count INTEGER,
OUT out4_row_count_ins INTEGER,
OUT out5_row_count_upd INTEGER
)
BEGIN
-- declare variables, do NOT declare the out params here!
DECLARE dummy INTEGER DEFAULT 0;
-- declare error handlers (e.g. continue handler for not found)
DECLARE CONTINUE HANDLER FOR NOT FOUND SET dummy = 1;
-- set out params defaulting to 0
SET out1_row_count = 0;
SET out2_row_count = 0;
SET out3_row_count = 0;
SET out4_row_count_ins = 0;
SET out5_row_count_upd = 0;
-- do inserts and updates and set the outparam variables accordingly
INSERT INTO some_table ...;
SET out1_row_count = ROW_COUNT();
-- commit if no errors
COMMIT;
END //
DELIMITER ;