pg-promise :取消使用 pg-query-stream 发起的查询
pg-promise : Cancel a query initiated with pg-query-stream
我有一个 postgresql table 每天都有成千上万的时间序列数据。我有一个应用程序允许用户检索这些数据。查询可能需要 200 毫秒到 30 秒,具体取决于时间范围,因此这些查询必须是可取消的,以避免对生产造成无用的负载。
数十亿的数据,使用流来检索是不可避免的。
所以我设法获得了一个像 pg-promise 文档中那样具有数据流的工作端点,并通过在 pg-query-stream
.
内关闭光标使其可取消
这是在此端点内完成的示例(在构建查询后调用 dataStream()):
const pgp = require("pg-promise")();
const QueryStream = require("pg-query-stream");
const db = pgp({
host: "1.2.3.4",
port: 5432,
database: "db",
user: "user",
password: "password",
max: 2,
});
// query is an SQL string
dataStream(query, req, res, next) {
const qs = new QueryStream(query);
// "close" event is triggered on client request cancelation
req.on("close", () => {
qs.destroy();
});
return db.stream(qs, s => {
s.pipe(JSONStream.stringify()).pipe(res);
s.on("error", error => handleError(error));
})
.catch(error => handleError(error, query));
}
它适用于几次调用,但在某些时候(快速执行 8 到 10 次调用以检查可取消性),应用程序会因此堆栈而崩溃:
\node_modules\pg-promise\node_modules\pg\lib\client.js:346
if (self.activeQuery.name) {
^
TypeError: Cannot read property 'name' of null
at Connection.<anonymous> (\node_modules\pg-promise\node_modules\pg\lib\client.js:346:26)
at Connection.emit (events.js:311:20)
at Socket.<anonymous> (\node_modules\pg-promise\node_modules\pg\lib\connection.js:120:12)
at Socket.emit (events.js:311:20)
at addChunk (_stream_readable.js:294:12)
at readableAddChunk (_stream_readable.js:275:11)
at Socket.Readable.push (_stream_readable.js:209:10)
at TCP.onStreamRead (internal/stream_base_commons.js:186:23)
所以我怀疑调用 qs.destroy() 关闭流不是正确的方法,即使游标在服务器端已被完全破坏。
感谢 node-postgres 和 pg-promise 开发人员的工作。
对于那些感兴趣的人,经过多次尝试,我找到了一个可行的解决方案。它还解决了我遇到的另一个问题:通过发送垃圾邮件请求来检查他们的可取消性,我注意到池中的一些客户端永远挂起并且永远不会 return,导致池满并且新请求永远挂起。
我认为这可以通过以下事实来解释:res
在流中通过管道传输,并且由于请求已被取消,可读流永远不会被消耗和挂起。
我的代码中的另一个问题是 req.on("close",
并不总是被触发。
为了解决这个问题,我找到了一个名为 on-finished
的模块,它的功能完全符合要求。
此外,调用 qs.destroy()
并不是正确的方法。经过长时间的调试,没有未处理错误的最一致的方法是从 pgp 的 Database.connect() 获取 Connection 对象并通过调用 connection.done().
结束查询
所以这是我的解决方案:
const pgp = require("pg-promise")();
const QueryStream = require("pg-query-stream");
const JSONStream = require("JSONStream");
const onFinished = require("on-finished");
const db = pgp({
host: "1.2.3.4",
port: 5432,
database: "db",
user: "user",
password: "password",
max: 2,
});
// query is an SQL string
async function dataStream(query, req, res, next) {
try {
if (query instanceof Object) {
query = query.toString();
}
const connection = await db.connect();
const qs = new QueryStream(query, [], {highWaterMark: 4000});
const streamData = connection.client.query(qs);
onFinished(res, () => {
// Calling .done() to end the connection on request close.
// Weirdly I sometimes get an error if I do not provide a callback.
connection.done(error => {
log.error(error);
});
});
streamData.pipe(JSONStream.stringify()).pipe(res);
streamData.on("error", error => {
next(error);
});
} catch (error) {
next(error);
}
}
我有一个 postgresql table 每天都有成千上万的时间序列数据。我有一个应用程序允许用户检索这些数据。查询可能需要 200 毫秒到 30 秒,具体取决于时间范围,因此这些查询必须是可取消的,以避免对生产造成无用的负载。
数十亿的数据,使用流来检索是不可避免的。
所以我设法获得了一个像 pg-promise 文档中那样具有数据流的工作端点,并通过在 pg-query-stream
.
这是在此端点内完成的示例(在构建查询后调用 dataStream()):
const pgp = require("pg-promise")();
const QueryStream = require("pg-query-stream");
const db = pgp({
host: "1.2.3.4",
port: 5432,
database: "db",
user: "user",
password: "password",
max: 2,
});
// query is an SQL string
dataStream(query, req, res, next) {
const qs = new QueryStream(query);
// "close" event is triggered on client request cancelation
req.on("close", () => {
qs.destroy();
});
return db.stream(qs, s => {
s.pipe(JSONStream.stringify()).pipe(res);
s.on("error", error => handleError(error));
})
.catch(error => handleError(error, query));
}
它适用于几次调用,但在某些时候(快速执行 8 到 10 次调用以检查可取消性),应用程序会因此堆栈而崩溃:
\node_modules\pg-promise\node_modules\pg\lib\client.js:346
if (self.activeQuery.name) {
^
TypeError: Cannot read property 'name' of null
at Connection.<anonymous> (\node_modules\pg-promise\node_modules\pg\lib\client.js:346:26)
at Connection.emit (events.js:311:20)
at Socket.<anonymous> (\node_modules\pg-promise\node_modules\pg\lib\connection.js:120:12)
at Socket.emit (events.js:311:20)
at addChunk (_stream_readable.js:294:12)
at readableAddChunk (_stream_readable.js:275:11)
at Socket.Readable.push (_stream_readable.js:209:10)
at TCP.onStreamRead (internal/stream_base_commons.js:186:23)
所以我怀疑调用 qs.destroy() 关闭流不是正确的方法,即使游标在服务器端已被完全破坏。
感谢 node-postgres 和 pg-promise 开发人员的工作。
对于那些感兴趣的人,经过多次尝试,我找到了一个可行的解决方案。它还解决了我遇到的另一个问题:通过发送垃圾邮件请求来检查他们的可取消性,我注意到池中的一些客户端永远挂起并且永远不会 return,导致池满并且新请求永远挂起。
我认为这可以通过以下事实来解释:res
在流中通过管道传输,并且由于请求已被取消,可读流永远不会被消耗和挂起。
我的代码中的另一个问题是 req.on("close",
并不总是被触发。
为了解决这个问题,我找到了一个名为 on-finished
的模块,它的功能完全符合要求。
此外,调用 qs.destroy()
并不是正确的方法。经过长时间的调试,没有未处理错误的最一致的方法是从 pgp 的 Database.connect() 获取 Connection 对象并通过调用 connection.done().
所以这是我的解决方案:
const pgp = require("pg-promise")();
const QueryStream = require("pg-query-stream");
const JSONStream = require("JSONStream");
const onFinished = require("on-finished");
const db = pgp({
host: "1.2.3.4",
port: 5432,
database: "db",
user: "user",
password: "password",
max: 2,
});
// query is an SQL string
async function dataStream(query, req, res, next) {
try {
if (query instanceof Object) {
query = query.toString();
}
const connection = await db.connect();
const qs = new QueryStream(query, [], {highWaterMark: 4000});
const streamData = connection.client.query(qs);
onFinished(res, () => {
// Calling .done() to end the connection on request close.
// Weirdly I sometimes get an error if I do not provide a callback.
connection.done(error => {
log.error(error);
});
});
streamData.pipe(JSONStream.stringify()).pipe(res);
streamData.on("error", error => {
next(error);
});
} catch (error) {
next(error);
}
}