考虑背压将数据从 Cassandra 流式传输到文件
Stream data from Cassandra to file considering backpressure
我有 Node App 可以收集投票提交并将它们存储在 Cassandra 中。投票存储为 base64 编码的加密字符串。 API 有一个名为 /export
的端点,它应该获取所有这些投票字符串(可能 > 100 万),将它们转换为二进制并将它们一个接一个地附加到 votes.egd 文件中。然后应该将该文件压缩并发送给客户端。我的想法是流式传输来自 Cassandra 的行,将每个投票字符串转换为二进制并写入 WriteStream
。
我想将此功能包装在 Promise 中以便于使用。我有以下内容:
streamVotesToFile(query, validVotesFileBasename) {
return new Promise((resolve, reject) => {
const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`);
writeStream.on('error', (err) => {
logger.error(`Writestream ${validVotesFileBasename}.egd error`);
reject(err);
});
writeStream.on('drain', () => {
logger.info(`Writestream ${validVotesFileBasename}.egd error`);
})
db.client.stream(query)
.on('readable', function() {
let row = this.read();
while (row) {
const envelope = new Buffer(row.vote, 'base64');
if(!writeStream.write(envelope + '\n')) {
logger.error(`Couldn't write vote`);
}
row = this.read()
}
})
.on('end', () => { // No more rows from Cassandra
writeStream.end();
writeStream.on('finish', () => {
logger.info(`Stream done writing`);
resolve();
});
})
.on('error', (err) => { // err is a response error from Cassandra
reject(err);
});
});
}
当我 运行 这样做时,它会将所有选票附加到一个文件中并可以正常下载。但是我有一堆problems/questions:
如果我向 /export
端点发出请求并且此函数 运行s,而它 运行ning 对应用程序的所有其他请求都非常慢或者在导出请求完成之前不要完成。我猜是因为事件循环被来自 Cassandra 流的所有这些事件占用(每秒数千次)?
所有选票似乎都可以正常写入文件,但我几乎每次 writeStream.write()
调用都会得到 false
并查看相应的记录消息(参见代码)?
我知道我需要考虑 WritableStream 的背压和 'drain' 事件,所以理想情况下我会使用 pipe()
并将投票传送到一个文件,因为它已经构建在背压支持中(对吗?)但由于我需要处理每一行(转换为二进制并可能在将来添加来自其他行字段的其他数据),我将如何使用管道执行此操作?
这是 TransformStream
的完美用例:
const myTransform = new Transform({
readableObjectMode: true,
transform(row, encoding, callback) {
// Transform the row into something else
const item = new Buffer(row['vote'], 'base64');
callback(null, item);
}
});
client.stream(query, params, { prepare: true })
.pipe(myTransform)
.pipe(fileStream);
在 Node.js API Docs 中查看有关如何实施 TransformStream
的更多信息。
我有 Node App 可以收集投票提交并将它们存储在 Cassandra 中。投票存储为 base64 编码的加密字符串。 API 有一个名为 /export
的端点,它应该获取所有这些投票字符串(可能 > 100 万),将它们转换为二进制并将它们一个接一个地附加到 votes.egd 文件中。然后应该将该文件压缩并发送给客户端。我的想法是流式传输来自 Cassandra 的行,将每个投票字符串转换为二进制并写入 WriteStream
。
我想将此功能包装在 Promise 中以便于使用。我有以下内容:
streamVotesToFile(query, validVotesFileBasename) {
return new Promise((resolve, reject) => {
const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`);
writeStream.on('error', (err) => {
logger.error(`Writestream ${validVotesFileBasename}.egd error`);
reject(err);
});
writeStream.on('drain', () => {
logger.info(`Writestream ${validVotesFileBasename}.egd error`);
})
db.client.stream(query)
.on('readable', function() {
let row = this.read();
while (row) {
const envelope = new Buffer(row.vote, 'base64');
if(!writeStream.write(envelope + '\n')) {
logger.error(`Couldn't write vote`);
}
row = this.read()
}
})
.on('end', () => { // No more rows from Cassandra
writeStream.end();
writeStream.on('finish', () => {
logger.info(`Stream done writing`);
resolve();
});
})
.on('error', (err) => { // err is a response error from Cassandra
reject(err);
});
});
}
当我 运行 这样做时,它会将所有选票附加到一个文件中并可以正常下载。但是我有一堆problems/questions:
如果我向
/export
端点发出请求并且此函数 运行s,而它 运行ning 对应用程序的所有其他请求都非常慢或者在导出请求完成之前不要完成。我猜是因为事件循环被来自 Cassandra 流的所有这些事件占用(每秒数千次)?所有选票似乎都可以正常写入文件,但我几乎每次
writeStream.write()
调用都会得到false
并查看相应的记录消息(参见代码)?我知道我需要考虑 WritableStream 的背压和 'drain' 事件,所以理想情况下我会使用
pipe()
并将投票传送到一个文件,因为它已经构建在背压支持中(对吗?)但由于我需要处理每一行(转换为二进制并可能在将来添加来自其他行字段的其他数据),我将如何使用管道执行此操作?
这是 TransformStream
的完美用例:
const myTransform = new Transform({
readableObjectMode: true,
transform(row, encoding, callback) {
// Transform the row into something else
const item = new Buffer(row['vote'], 'base64');
callback(null, item);
}
});
client.stream(query, params, { prepare: true })
.pipe(myTransform)
.pipe(fileStream);
在 Node.js API Docs 中查看有关如何实施 TransformStream
的更多信息。