使用 Express 流式传输结果时获取连接超时

Timeout acquiring a connection when streaming results using Express

我们使用以下代码将查询结果流式传输回客户端:

app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_user: 'foo' })
      .stream()

    stream.pipe(JSONStream.stringify()).pipe(res)
  } catch (err) {
    next(err)
  }
})

虽然代码似乎具有出色的内存使用情况(stable/low 内存使用情况),但它会创建随机数据库连接获取超时:

Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?

这在生产中以看似随机的时间间隔发生。知道为什么吗?

发生这种情况是因为中止的请求(即客户端在请求中关闭浏览器)不会将连接释放回池中。

首先,确保您使用的是最新的 knex;或至少 v0.21.3+ which has introduced fixes to stream/pool handling.

从现在开始,您有几个选择:

要么使用 stream.pipeline instead of stream.pipe 正确处理中止的请求,如下所示:

const { pipeline } = require('stream')

app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_session: req.query.id_session })
      .stream()

    return pipeline(stream, JSONStream.stringify(), res, err => {
      if (err) {
        return console.log(`Pipeline failed with err:`, err)
      }

      console.log(`Pipeline ended succesfully`)
    })
  } catch (err) {
    next(err)
  }
})

或在 req 上监听 [close][关闭] 事件并自行销毁数据库流,如下所示:

app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_session: req.query.id_session })
      .stream()

    // Not listening to this event will crash the process if
    // stream.destroy(err) is called.
    stream.on('error', () => {
      console.log('Stream was destroyed')
    })

    req.on('close', () => {
      // stream.end() does not seem to work, only destroy()
      stream.destroy('Aborted request')
    })

    stream.pipe(JSONStream.stringify()).pipe(res)
  } catch (err) {
    next(err)
  }
})

有用的阅读: