检测流的可写最后一个块
Detect stream's Writable last chunk
我创建了一个 Writable
流连接到我们系统中的一个大型管道,该管道在收到 BUFFER_SIZE
个块(对象)后写入数据库。
getStream() {
const buffer = [];
const stream = new Writable({
objectMode: true,
async write(chunk,enc,next) {
buffer.push(chunk);
if( buffer.length > BUFFER_SIZE ) {
await insertToDB(buffer);
}
next();
}
});
stream.on('finish', async () => {
// insert last batch?
if( buffer.length ) {
await insertToDB(buffer);
}
});
return stream;
}
async consumer() {
await pipeline(...largePipeline, getStream());
closeAll();
}
这工作正常,但我遇到的问题是 on('finish', ...)
事件处理程序调用得太晚,在 consumer()
函数中调用了 closeAll()
之后。
有没有办法让 write()
方法知道它刚刚收到最后一个 chunk
?这样我就可以在调用最后一个 next()
之前刷新缓冲区,一切都会同步。
请注意在此代码库中,管道、消费者和编写器之间有非常严格的分离,我不能也不会交换承诺、标志或状态检查这些组件之间。可写流是一个独立的单元!我正在寻找 Node Streams 通过缓冲写入解决此问题的方法,必须有一种方法来检查 Writable
流是否最后被调用并等待它真正完成,但我无法理解它。
好的,我找到了正确的方法,Node Streams 在数据耗尽时调用 final(next)
方法,这就是你应该在“发布”之前完成你的写作的地方流:
getStream() {
let buffer = [];
return new Writable({
objectMode: true,
async write(chunk,enc,next) {
buffer.push(chunk);
if( buffer.length > BUFFER_SIZE ) {
await insertToDB(buffer);
}
next();
},
async final(next) {
// insert last batch?
if( buffer.length ) {
await insertToDB(buffer);
buffer = [];
}
next();
}
});
}
async consumer() {
await pipeline(...largePipeline, getStream());
closeAll();
}
我还发现 writev()
函数是编写缓冲块的最佳方法,而不是在 Writable
流中实现您自己的缓冲区:
getStream() {
return new Writable({
objectMode: true,
highWaterMark: BUFFER_SIZE,
async writev(chunks, next) {
await insertToDB( chunks.map( chunk => chunk.chunk ) );
next();
}
});
}
它利用 highWaterMark
配置设置每次发送给你的块对象的数量,这允许 Node 更好地控制整个流管道的背压并简化你的 Writable
设计.
我创建了一个 Writable
流连接到我们系统中的一个大型管道,该管道在收到 BUFFER_SIZE
个块(对象)后写入数据库。
getStream() {
const buffer = [];
const stream = new Writable({
objectMode: true,
async write(chunk,enc,next) {
buffer.push(chunk);
if( buffer.length > BUFFER_SIZE ) {
await insertToDB(buffer);
}
next();
}
});
stream.on('finish', async () => {
// insert last batch?
if( buffer.length ) {
await insertToDB(buffer);
}
});
return stream;
}
async consumer() {
await pipeline(...largePipeline, getStream());
closeAll();
}
这工作正常,但我遇到的问题是 on('finish', ...)
事件处理程序调用得太晚,在 consumer()
函数中调用了 closeAll()
之后。
有没有办法让 write()
方法知道它刚刚收到最后一个 chunk
?这样我就可以在调用最后一个 next()
之前刷新缓冲区,一切都会同步。
请注意在此代码库中,管道、消费者和编写器之间有非常严格的分离,我不能也不会交换承诺、标志或状态检查这些组件之间。可写流是一个独立的单元!我正在寻找 Node Streams 通过缓冲写入解决此问题的方法,必须有一种方法来检查 Writable
流是否最后被调用并等待它真正完成,但我无法理解它。
好的,我找到了正确的方法,Node Streams 在数据耗尽时调用 final(next)
方法,这就是你应该在“发布”之前完成你的写作的地方流:
getStream() {
let buffer = [];
return new Writable({
objectMode: true,
async write(chunk,enc,next) {
buffer.push(chunk);
if( buffer.length > BUFFER_SIZE ) {
await insertToDB(buffer);
}
next();
},
async final(next) {
// insert last batch?
if( buffer.length ) {
await insertToDB(buffer);
buffer = [];
}
next();
}
});
}
async consumer() {
await pipeline(...largePipeline, getStream());
closeAll();
}
我还发现 writev()
函数是编写缓冲块的最佳方法,而不是在 Writable
流中实现您自己的缓冲区:
getStream() {
return new Writable({
objectMode: true,
highWaterMark: BUFFER_SIZE,
async writev(chunks, next) {
await insertToDB( chunks.map( chunk => chunk.chunk ) );
next();
}
});
}
它利用 highWaterMark
配置设置每次发送给你的块对象的数量,这允许 Node 更好地控制整个流管道的背压并简化你的 Writable
设计.