Node.js Streaming/Piping 错误处理(更改错误响应状态)
Node.js Streaming/Piping Error Handling (Change Response Status on Error)
我的 Cassandra 数据库中有数百万行,我想以 zip 文件的形式流式传输到客户端(不希望内存中有一个潜在的巨大 zip 文件)。我正在使用 Cassandra-Node driver 中的 stream() 函数,管道连接到一个 Transformer,它从我关心的每一行中提取一个字段并附加一个换行符,并通过管道将哪些管道归档到快速响应 object。这似乎工作正常,但我无法弄清楚如何在流式传输期间正确处理错误。我必须在为客户端流式传输之前设置适当的 headers/status,但是如果在流式传输期间出现错误,例如在 dbStream 上,我想清理所有管道并将响应状态重置为某种东西像 404。但是如果我在设置 headers 并开始流式传输后尝试重置状态,我会得到 Can't set headers after they are sent
。当 piping/streaming 到响应 object 时,我已经查看了所有内容,但找不到如何正确处理 Node 中的错误。如果我无法发送正确的错误响应代码,客户端如何判断是否实际传输了有效数据?有人可以帮忙吗?
function streamNamesToWriteStream(query, res, options) {
return new Promise((resolve, reject) => {
let success = true;
const dbStream = db.client.stream(query);
const rowTransformer = new Transform({
objectMode: true,
transform(row, encoding, callback) {
try {
const vote = row.name + '\n';
callback(null, vote);
} catch (err) {
callback(null, err.message + '\n');
}
}
});
// Handle res events
res.on('error', (err) => {
logger.error(`res ${res} error`);
return reject(err);
});
dbStream.on('error', function(err) {
res.status(404).send() // Can't set headers after they are sent.
logger.debug(`dbStream error: ${err}`);
success = false;
//res.end();
//return reject(err);
});
res.writeHead(200, {
'Content-Type': 'application/zip',
'Content-disposition': 'attachment; filename=myFile.zip'
});
const archive = archiver.create('zip');
archive.on('error', function(err) { throw err; });
archive.on('end', function(err) {
logger.debug(`Archive done`);
//res.status(404).end()
});
archive.pipe(res, {
//end:false
});
archive.append(dbStream.pipe(rowTransformer), { name: 'file1.txt' });
archive.append(dbStream.pipe(rowTransformer), { name: 'file1.txt' });
archive.finalize();
});
}
显然现在更改 headers 已经太晚了,因此必须有应用程序逻辑来检测问题。这是我的一些想法:
发生错误时,在流的末尾写入某种明确的哨兵。然后 zip 文件的使用者将需要查找该值以检查问题。
也许更简单,让消费者对 zip 存档的完整性执行验证。据推测,如果流失败,zip 将被损坏。
我的 Cassandra 数据库中有数百万行,我想以 zip 文件的形式流式传输到客户端(不希望内存中有一个潜在的巨大 zip 文件)。我正在使用 Cassandra-Node driver 中的 stream() 函数,管道连接到一个 Transformer,它从我关心的每一行中提取一个字段并附加一个换行符,并通过管道将哪些管道归档到快速响应 object。这似乎工作正常,但我无法弄清楚如何在流式传输期间正确处理错误。我必须在为客户端流式传输之前设置适当的 headers/status,但是如果在流式传输期间出现错误,例如在 dbStream 上,我想清理所有管道并将响应状态重置为某种东西像 404。但是如果我在设置 headers 并开始流式传输后尝试重置状态,我会得到 Can't set headers after they are sent
。当 piping/streaming 到响应 object 时,我已经查看了所有内容,但找不到如何正确处理 Node 中的错误。如果我无法发送正确的错误响应代码,客户端如何判断是否实际传输了有效数据?有人可以帮忙吗?
function streamNamesToWriteStream(query, res, options) {
return new Promise((resolve, reject) => {
let success = true;
const dbStream = db.client.stream(query);
const rowTransformer = new Transform({
objectMode: true,
transform(row, encoding, callback) {
try {
const vote = row.name + '\n';
callback(null, vote);
} catch (err) {
callback(null, err.message + '\n');
}
}
});
// Handle res events
res.on('error', (err) => {
logger.error(`res ${res} error`);
return reject(err);
});
dbStream.on('error', function(err) {
res.status(404).send() // Can't set headers after they are sent.
logger.debug(`dbStream error: ${err}`);
success = false;
//res.end();
//return reject(err);
});
res.writeHead(200, {
'Content-Type': 'application/zip',
'Content-disposition': 'attachment; filename=myFile.zip'
});
const archive = archiver.create('zip');
archive.on('error', function(err) { throw err; });
archive.on('end', function(err) {
logger.debug(`Archive done`);
//res.status(404).end()
});
archive.pipe(res, {
//end:false
});
archive.append(dbStream.pipe(rowTransformer), { name: 'file1.txt' });
archive.append(dbStream.pipe(rowTransformer), { name: 'file1.txt' });
archive.finalize();
});
}
显然现在更改 headers 已经太晚了,因此必须有应用程序逻辑来检测问题。这是我的一些想法:
发生错误时,在流的末尾写入某种明确的哨兵。然后 zip 文件的使用者将需要查找该值以检查问题。
也许更简单,让消费者对 zip 存档的完整性执行验证。据推测,如果流失败,zip 将被损坏。