Python sqlalchemy 和 mySQL 存储过程总是 returns 0(仅输出参数)

Python sqlalchemy and mySQL stored procedure always returns 0 (out param only)

我正在尝试从 MySQL 存储过程中获取 ROW_COUNT() 到 python。

这是我得到的,但我不知道我错过了什么。

DELIMITER //
CREATE OR REPLACE PROCEDURE sp_refresh_mytable(
    OUT row_count INT
)
BEGIN
    DECLARE exit handler for SQLEXCEPTION
        BEGIN
            ROLLBACK;
        END;
    DECLARE exit handler for SQLWARNING
        BEGIN
            ROLLBACK;
        END;
    DECLARE exit handler FOR NOT FOUND
        BEGIN
            ROLLBACK;
        END;

    START TRANSACTION;
    DELETE FROM mytable;

    INSERT INTO mytable
    (
          col1
        , col2
    )
    SELECT
          col1
        , col2
    FROM othertable
    ;
    SET row_count =  ROW_COUNT();
    COMMIT;
END //

DELIMITER ;

如果我像下面这样通过正常 SQL 调用它,我会得到正确的 row_count 插入操作(例如插入 26 行):

CALL sp_refresh_mytable(@rowcount);
select @rowcount as t;
-- output: 26 

然后在python/mysqlalchemy:

def call_procedure(engine, function_name, params=None):
    connection = engine.raw_connection()
    try:
        cursor = connection.cursor()
        result = cursor.callproc('sp_refresh_mytable', [0])
        ## try result outputs
        resultfetch = cursor.fetchone()
        logger.info(result)
        logger.info(result[0])
        logger.info(resultfetch)
        cursor.close()
        connection.commit()
        connection.close()
        logger.info(f"Running procedure {function_name} success!")
        return result
    except Exception as e:
        logger.error(f"Running procedure {function_name} failed!")
        logger.exception(e)
        return None
    finally:
        connection.close()

所以我尝试记录获取输出值的不同变体,但它始终为 0 或 None。

[INFO] db_update    [0]
[INFO] db_update    0
[INFO] db_update    None

我错过了什么?

谢谢!

this answer 的帮助下,我找到了以下适合我的解决方案。

a) 使用 engine.raw_connection()cursor.callproc 的工作解决方案:

def call_procedure(engine, function_name):
    connection = engine.raw_connection()
    try:
        cursor = connection.cursor()
        cursor.callproc(function_name, [0])
        cursor.execute(f"""SELECT @_{function_name}_0""")
        results = cursor.fetchone() ## returns a tuple e.g. (285,)
        rows_affected = results[0]
        cursor.close()
        connection.commit()
        logger.info(f"Running procedure {function_name} success!")
        return rows_affected
    except Exception as e:
        logger.error(f"Running procedure {function_name} failed!")
        logger.exception(e)
        return None
    finally:
        connection.close()

并且 this answer 我也找到了这个解决方案:

b) 代替使用原始连接,这同样有效:

def call_procedure(engine, function_name, params=None):
    try:
        with engine.begin() as db_conn:
            db_conn.execute(f"""CALL {function_name}(@out)""")
            results = db_conn.execute('SELECT @out').fetchone() ## returns a tuple e.g. (285,)
            rows_affected = results[0]
        logger.debug(f"Running procedure {function_name} success!")
        return rows_affected
    except Exception as e:
        logger.error(f"Running procedure {function_name} failed!")
        logger.exception(e)
        return None
    finally:
        if db_conn: db_conn.close()

如果使用其中一种方法相对于另一种方法有任何优点或缺点,请在评论中告诉我。

我只是想添加另一段代码,因为我试图让 callprocmultiple in- 和 out-params[=27 一起工作(使用 sqlalchemy) =].

对于这种情况,我在之前的回答中使用了使用原始连接 [解决方案 b)] 的 callproc 方法,因为此函数接受参数作为列表。

它可能在某些部分做得更优雅或更 pythonic,但它主要是为了让它工作,我可能会从中创建一个函数,这样我就可以用它来一般地调用具有多个的 SP和输出参数。

我在下面的代码中添加了注释,以便更容易理解正在发生的事情。

在我的例子中,我决定将 out-params 放在一个字典中,这样我就可以将它传递给调用应用程序,以防我需要对结果做出反应。当然你也可以包括 in-params 这可能对错误记录有意义。

## some in params
function_name   = 'sp_upsert'
in_param1       = 'my param 1'
in_param2       = 'abcdefg'
in_param3       = 'some-name'
in_param4       = 'some display name'
in_params = [in_param1, in_param1, in_param1, in_param1]
## out params
out_params = [
     'out1_row_count'
    ,'out2_row_count'
    ,'out3_row_count'
    ,'out4_row_count_ins'
    ,'out5_row_count_upd'
]
params = copy(in_params)
## adding the outparams as integers from out_params indices
params.extend([i for i, x in enumerate(out_params)])
## the params list will look like
## ['my param 1', 'abcdefg', 'some-name', 'some display name', 0, 1, 2, 3, 4]
logger.info(params)
## build query to get results from callproc (including in and out params)
res_qry_params = []
for i in range(len(params)):
    res_qry_params.append(f"@_{function_name}_{i}")
res_qry = f"SELECT  {', '.join(res_qry_params)}"
## the query to fetch the results (in and out params) will look like
## SELECT  @_sp_upsert_0, @_sp_upsert_1, @_sp_upsert_2, @_sp_upsert_3, @_sp_upsert_4, @_sp_upsert_5, @_sp_upsert_6, @_sp_upsert_7, @_sp_upsert_8
logger.info(res_qry)
try:
    connection = engine.raw_connection()
    ## calling the sp
    cursor = connection.cursor()
    cursor.callproc(function_name, params)
    ## get the results (includes in and out params), the 0/1 in the end are the row_counts from the sp
    ## fetchone is enough since all results come as on result record like
    ## ('my param 1', 'abcdefg', 'some-name', 'some display name', 1, 0, 1, 1, 0)
    cursor.execute(res_qry)
    results = cursor.fetchone()
    logger.info(results)
    ## adding just the out params to a dict
    res_dict = {}
    for i, element in enumerate(out_params):
        res_dict.update({
            element: results[i + len(in_params)]
            })
    ## the result dict in this case only contains the out param results and will look like
    ## { 'out1_row_count': 1,
    ##   'out2_row_count': 0,
    ##   'out3_row_count': 1,
    ##   'out4_row_count_ins': 1,
    ##   'out5_row_count_upd': 0}
    logger.info(pformat(res_dict, indent=2, sort_dicts=False))
    cursor.close()
    connection.commit()
    logger.debug(f"Running procedure {function_name} success!")
except Exception as e:
    logger.error(f"Running procedure {function_name} failed!")
    logger.exception(e)

为了完成图片,这里是我的存储过程的简化版本。在 BEGIN 之后,我声明了一些错误处理程序,我将输出参数设置为默认值 0,否则它们也可以 return as NULL/None 如果过程未设置(例如,因为没有插入):

DELIMITER //
CREATE OR REPLACE PROCEDURE sp_upsert(
    IN in_param1 VARCHAR(32),
    IN in_param2 VARCHAR(250),
    IN in_param3 VARCHAR(250),
    IN in_param4 VARCHAR(250),
    OUT out1_row_count INTEGER,
    OUT out2_row_count INTEGER,
    OUT out3_row_count INTEGER,
    OUT out4_row_count_ins INTEGER,
    OUT out5_row_count_upd INTEGER
)
BEGIN
    -- declare variables, do NOT declare the out params here!
    DECLARE dummy INTEGER DEFAULT 0;
    
    -- declare error handlers (e.g. continue handler for not found)
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET dummy = 1;
    
    -- set out params defaulting to 0
    SET out1_row_count = 0;
    SET out2_row_count = 0;
    SET out3_row_count = 0;
    SET out4_row_count_ins = 0;
    SET out5_row_count_upd = 0;

    -- do inserts and updates and set the outparam variables accordingly
    INSERT INTO some_table ...;
    SET out1_row_count = ROW_COUNT();

    -- commit if no errors
    COMMIT;
END //
DELIMITER ;