获得关于流管道的不完整数据以进行快速响应
Got incomplete data on stream piping to an express response
需要将 DB table 转换为 csv 报告。
如果我立即用一个查询卸载整个 tablet,那么应用程序就会崩溃,因为内存用完了。我决定从 table 的 100 行部分中查询数据,将每一行转换为报告的一行并将其写入流中,该流通过管道进行快速响应。
这一切几乎是这样发生的:
数据库查询
const select100Users = (maxUserCreationDateStr) => {
return db.query(`
SELECT * FROM users WHERE created_at < to_timestamp(${maxUserCreationDateStr})
ORDER BY created_at DESC LIMIT 100`);
}
流初始化
const { PassThrough } = require('stream');
const getUserReportStream = () => {
const stream = new PassThrough();
writeUserReport(stream).catch((e) => stream.emit('error', e));
return stream;
};
通过快速响应管道传输流
app.get('/report', (req, res) => {
const stream = getUserReportStream();
res.setHeader('Content-Type', 'application/vnd.ms-excel');
res.setHeader(`Content-Disposition', 'attachment; filename="${ filename }"`);
stream.pipe(res);
});
最后是如何将数据写入流
const writeUserReport(stream) => {
let maxUserCreationDateGlobal = Math.trunc(Date.now() / 1000);
let flag = true;
stream.write(USER_REPORT_HEADER);
while (flag) {
const rows100 = await select100Users(maxUserCreationDateGlobal);
console.log(rows100.length);
if (rows100.length === 0) {
flag = false;
} else {
let maxUserCreationDate = maxUserCreationDateGlobal;
const users100 = await Promise.all(
rows100.map((r) => {
const created_at = r.created_at;
const createdAt = new Date(created_at);
if (created_at && createdAt.toString() !== 'Invalid Date') {
const createdAtNumber = Math.trunc(createdAt.valueOf() / 1000);
maxUserCreationDate = Math.min(maxUserCreationDate, createdAtNumber);
}
return mapUser(r); // returns a promise
})
);
users100.forEach((u) => stream.write(generateCsvRowFromUser(u)));
maxUserCreationDateGlobal = maxUserCreationDate;
if (rows100.length < 100) {
flag = false;
console.log('***');
}
}
}
console.log('end');
stream.end();
};
因此我在控制台中看到了这个输出:
100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87 // 587
***
end
但是在下载的文件中我得到了 401 行(第一行 USER_REPORT_HEADER)。感觉就像 stream.end()
在从流中读取所有值之前关闭流。
我尝试以类似的方式使用 rxjs 中的 BehaviorSubject 而不是 PassThrough - 结果是一样的..
我如何等待从流中读取我在那里写入的所有数据?
或者也许有人可以推荐一种替代方法来解决这个问题。
stream.write
希望您将回调作为第二个(或第三个参数)传递,以了解写入操作何时完成。除非之前的写操作完成,否则不能再次调用 write。
所以一般来说,我建议将整个函数设为异步,每次调用 stream.write
时都将其包装成 Promise,例如
await new Promise((resolve, reject) => stream.write(data, (error) => {
if (error) {
reject(error);
return;
}
resolve();
});
显然,将其提取到某种方法中是有意义的。
编辑:此外,我认为这不是实际问题。我假设您的 http 连接只是在所有提取完成之前超时,因此服务器最终将在达到超时期限后关闭流。
需要将 DB table 转换为 csv 报告。
如果我立即用一个查询卸载整个 tablet,那么应用程序就会崩溃,因为内存用完了。我决定从 table 的 100 行部分中查询数据,将每一行转换为报告的一行并将其写入流中,该流通过管道进行快速响应。
这一切几乎是这样发生的:
数据库查询
const select100Users = (maxUserCreationDateStr) => { return db.query(` SELECT * FROM users WHERE created_at < to_timestamp(${maxUserCreationDateStr}) ORDER BY created_at DESC LIMIT 100`); }
流初始化
const { PassThrough } = require('stream'); const getUserReportStream = () => { const stream = new PassThrough(); writeUserReport(stream).catch((e) => stream.emit('error', e)); return stream; };
通过快速响应管道传输流
app.get('/report', (req, res) => { const stream = getUserReportStream(); res.setHeader('Content-Type', 'application/vnd.ms-excel'); res.setHeader(`Content-Disposition', 'attachment; filename="${ filename }"`); stream.pipe(res); });
最后是如何将数据写入流
const writeUserReport(stream) => { let maxUserCreationDateGlobal = Math.trunc(Date.now() / 1000); let flag = true; stream.write(USER_REPORT_HEADER); while (flag) { const rows100 = await select100Users(maxUserCreationDateGlobal); console.log(rows100.length); if (rows100.length === 0) { flag = false; } else { let maxUserCreationDate = maxUserCreationDateGlobal; const users100 = await Promise.all( rows100.map((r) => { const created_at = r.created_at; const createdAt = new Date(created_at); if (created_at && createdAt.toString() !== 'Invalid Date') { const createdAtNumber = Math.trunc(createdAt.valueOf() / 1000); maxUserCreationDate = Math.min(maxUserCreationDate, createdAtNumber); } return mapUser(r); // returns a promise }) ); users100.forEach((u) => stream.write(generateCsvRowFromUser(u))); maxUserCreationDateGlobal = maxUserCreationDate; if (rows100.length < 100) { flag = false; console.log('***'); } } } console.log('end'); stream.end(); };
因此我在控制台中看到了这个输出:
100 // 100
100 // 200
100 // 300
100 // 400
100 // 500
87 // 587
***
end
但是在下载的文件中我得到了 401 行(第一行 USER_REPORT_HEADER)。感觉就像 stream.end()
在从流中读取所有值之前关闭流。
我尝试以类似的方式使用 rxjs 中的 BehaviorSubject 而不是 PassThrough - 结果是一样的..
我如何等待从流中读取我在那里写入的所有数据?
或者也许有人可以推荐一种替代方法来解决这个问题。
stream.write
希望您将回调作为第二个(或第三个参数)传递,以了解写入操作何时完成。除非之前的写操作完成,否则不能再次调用 write。
所以一般来说,我建议将整个函数设为异步,每次调用 stream.write
时都将其包装成 Promise,例如
await new Promise((resolve, reject) => stream.write(data, (error) => {
if (error) {
reject(error);
return;
}
resolve();
});
显然,将其提取到某种方法中是有意义的。
编辑:此外,我认为这不是实际问题。我假设您的 http 连接只是在所有提取完成之前超时,因此服务器最终将在达到超时期限后关闭流。