ERR_STREAM_WRITE_AFTER_END 在 Node.js 上使用 MySQL2 运行 执行查询时

ERR_STREAM_WRITE_AFTER_END when executing query with MySQL2 running on Node.js

我正在实现一项功能,该功能应在单击按钮时通过向后端发送请求并使用 MySQL2 更新数据库来延长用户会话的生命周期。

为此,我编写了以下前端代码:

onClose: function (oAction) {
    try {
        if (oAction == "YES") {
            let reqURL = "/sessionExtend";
            let reqData = {
                session_id: sessionStorage.getItem("SessionId"),
                user_id: sessionStorage.getItem("UserId")
            };
            let callbackOK = function (responseData) {
                curr.onSuccessfulResponse(curr, responseData, "sessionExtendSuccess", "sessionExtendFail", "", false);
            };
            let callbackErr = function (responseData) {
                curr.onErrorResponse(curr, responseData, "sessionExtendFail");
            };

            curr.performRequest(reqURL, reqData, callbackOK, callbackErr);
        }
    } catch (err) {
        console.log(err);
        MessageToast.show(sMsg);
    }
}

请求由 app.js 接收,它使用 MySQL2 建立数据库连接并将请求转发到 DAL:

app.post("/sessionExtend", async function (req, res) {

    let session_id = req.body.session_id;
    let user_id = req.body.user_id;

    let con = DAL.getConnection();

    res.setHeader("Content-Type", "application/json");

    try {

        const response = await DAL.sessionExtend(con, session_id, user_id);

        res.send(JSON.stringify({
            "result": true,
            "message": "session extended"
        }));

    } catch (e) {

        res.send(JSON.stringify({
            "result": false,
            "message": "can not extend session"
        }));

    }

    con.close();

});

DAL 模块执行 SQL 查询,结果 return 应该是成功还是错误:

sessionExtend: async function sessionExtend(con, session_id, user_id) {

    con.connect(function (err) {
        try {
            if (err) throw err;
            con.query(qryDict.SQL_QUERIES.setUpdateExtendSession, [session_id, user_id], function (err) {

                let result;

                if (err) {
                    result = JSON.stringify({
                        "result": false,
                        "message": "failure"
                    });
                } else {
                    result = JSON.stringify({
                        "result": true,
                        "message": "success"
                    });
                }

                return result;

            });
        } catch (err) {
            let result = JSON.stringify({
                "result": false,
                "message": err
            });

            return result;
        }
    });
},

问题是当我在调试器中执行这段代码时,出现异常:

ERR_STREAM_WRITE_AFTER_END Error [ERR_STREAM_WRITE_AFTER_END]: write after end at Socket.Writable.write (_stream_writable.js:297:11) at Connection.write (C:\Users\User\IdeaProjects\TST\node_modules\mysql2\lib\connection.js:226:17) at Connection.writePacket (C:\Users\User\IdeaProjects\TST\node_modules\mysql2\lib\connection.js:271:12) at ClientHandshake.sendCredentials (C:\Users\User\IdeaProjects\TST\node_modules\mysql2\lib\commands\client_handshake.js:64:16) at ClientHandshake.handshakeInit (C:\Users\User\IdeaProjects\TST\node_modules\mysql2\lib\commands\client_handshake.js:137:12) at ClientHandshake.execute (C:\Users\User\IdeaProjects\TST\node_modules\mysql2\lib\commands\command.js:39:22) at Connection.handlePacket (C:\Users\User\IdeaProjects\TST\node_modules\mysql2\lib\connection.js:417:32) at PacketParser.onPacket (C:\Users\User\IdeaProjects\TST\node_modules\mysql2\lib\connection.js:75:12) at PacketParser.executeStart (C:\Users\User\IdeaProjects\TST\node_modules\mysql2\lib\packet_parser.js:75:16) at Socket. (C:\Users\User\IdeaProjects\TST\node_modules\mysql2\lib\connection.js:82:25)

我也注意到在调试的时候,我先在前端得到了后端的响应,然后才到达DAL中的断点con.query(qryDict.SQL_QUERIES.setUpdateExtendSession, [session_id, user_id], function (err) {…}

我的问题:

  1. 为什么会出现 ERR_STREAM_WRITE_AFTER_END 以及如何避免?

  2. 为什么我先在前端得到后端的响应,然后才到达DAL中的断点?我假设 await DAL.sessionExtend(con, session_id, user_id) 应该等到 DAL 上的任务完成并且承诺得到解决。

简而言之:你不是在等待con.connectioncon.query,所以外层代码继续调用con.close和returns 是前端结果,稍后 con.query 尝试通过现已关闭的连接发送查询,导致此异常。

您正在编写异步函数,但您只创建了它们 "half-async"。

例如,这行不通:

async function getStuff () {
  stuff.get(function (err, data) {
    if (err) throw err // kills your process if it happend!
    return data.stuff // returns to nowhere
  })
}

// later on:
const stuff = await getStuff()
console.log(stuff) // prints undefined!

...因为本质上,您的异步函数只是同步调用另一个函数(不等待它)然后立即 return 什么都没有(即 undefined):

async function getStuff () {
  stuff.get(...)
  // as you can see, no return inside getStuff
}

稍后,您传递的回调将运行,但是您的外部代码的列车已经离开站台。

你想要做的是 stuff.get return 一个承诺(大多数现代库都会这样做,即使它们另外公开了一个与旧代码库兼容的回调承诺)和 await它:

async function getStuff () {
  const data = await stuff.get() // waits for the stuff to come back
  return data.stuff // actually returns the stuff
  // The `if (err) throw err` now became unnecessary as well
}

// later on:
const stuff = await getStuff()
console.log(stuff) // prints the stuff!

如果您的 SQL 库会公开一个 promise 接口,您可以简单地 await 它。您写道您正在使用 mysql2。如果 require('mysql2/promise') 需要,这个库有一个 promise 接口。我建议切换到 promise 接口而不是回调接口!

还有一种方法可以 "upgrade" 现有的 con 连接到 promise 接口:con.promise()。所以你只需做 con = DAL.getConnection().promise() 而不是 con = DAL.getConnection().

然后,您可以像这样重写代码(或等效代码,具体取决于您选择的库):

async function sessionExtend(con, session_id, user_id) {
    try {
        await con.connect()
        await con.query(qryDict.SQL_QUERIES.setUpdateExtendSession, [session_id, user_id])
        return JSON.stringify({ result: true, message: 'success' })
    } catch (err) {
        return JSON.stringify({ result: false, message: err.toString() })
    }
}

编辑: 以下部分实际上已过时,因为 mysql2 允许将现有连接升级到 promise 接口,但我将保留此部分无论如何,如果它能帮助处于类似情况的其他人!

如果你不能切换到 promise 接口,你可以改为 promisify 现有的调用(虽然它看起来有点复杂):

const { promisify } = require('util')

async function sessionExtend(con, session_id, user_id) {
    try {
        await promisify(con.connect).call(con)
        await promisify(con.query).call(con, qryDict.SQL_QUERIES.setUpdateExtendSession, [session_id, user_id])
        return JSON.stringify({ result: true, message: 'success' })
    } catch (err) {
        return JSON.stringify({ result: false, message: err.toString() })
    }
}

util.promisify 包装了一个需要 (err, data) 回调的函数,将其转换为一个 return 是一个 promise 的异步函数。由于 con.query 等人。虽然是 con 上的方法,但它们需要保留该上下文,这就是为什么我写 promisify(con.query).call(con, ...) 而不是 promisify(con.query)(...).

CherryDT 的帮助下,问题已通过切换到 ES7 async/await-wrapper 版本 MySQL2 解决— mysql2/promise.

为了节省时间剩下的public,最后的即用代码:

app.js

app.post("/sessionExtend", async function (req, res) {

    let session_id = req.body.session_id;
    let user_id = req.body.user_id;

    const con = await DAL.getConnection();

    res.setHeader("Content-Type", "application/json");

    const response = await DAL.sessionExtend(con, session_id, user_id);

    res.send(JSON.stringify({
        "result": response.result,
        "message": response.message
    }));

    await con.close();

});

DAL.js

sessionExtend: async function sessionExtend(con, session_id, user_id) {

    let result;

    const [rows, fields] = await con.execute(qryDict.SQL_QUERIES.setUpdateExtendSession, [session_id, user_id]);

    if (rows.warningStatus === 0) {
        result = {
            "result": true,
            "message": "session extended"
        };
    } else {
        result = {
            "result": false,
            "message": "session is not extended"
        };
    }

    return result;

},

如您所见,现在代码更易于理解和维护。

P.S。我的建议:使用 async/await,它们很棒,尽量避免回调。