检测流的可写最后一个块

Detect stream's Writable last chunk

我创建了一个 Writable 流连接到我们系统中的一个大型管道,该管道在收到 BUFFER_SIZE 个块(对象)后写入数据库。

getStream() {
    const buffer = [];

    const stream = new Writable({
        objectMode: true,
        async write(chunk,enc,next) {
            buffer.push(chunk);
            if( buffer.length > BUFFER_SIZE ) {
                await insertToDB(buffer);
            }
            next();
        }
    });

    stream.on('finish', async () => {
        // insert last batch?
        if( buffer.length ) {
            await insertToDB(buffer);
        }
    });

    return stream;
}

async consumer() {
    await pipeline(...largePipeline, getStream());
    closeAll();
}

这工作正常,但我遇到的问题是 on('finish', ...) 事件处理程序调用得太晚,在 consumer() 函数中调用了 closeAll() 之后。

有没有办法让 write() 方法知道它刚刚收到最后一个 chunk?这样我就可以在调用最后一个 next() 之前刷新缓冲区,一切都会同步。

请注意在此代码库中,管道、消费者和编写器之间有非常严格的分离,我不能也不会交换承诺、标志或状态检查这些组件之间。可写流是一个独立的单元!我正在寻找 Node Streams 通过缓冲写入解决此问题的方法,必须有一种方法来检查 Writable 流是否最后被调用并等待它真正完成,但我无法理解它。

好的,我找到了正确的方法,Node Streams 在数据耗尽时调用 final(next) 方法,这就是你应该在“发布”之前完成你的写作的地方流:

getStream() {
    let buffer = [];

    return new Writable({
        objectMode: true,
        async write(chunk,enc,next) {
            buffer.push(chunk);
            if( buffer.length > BUFFER_SIZE ) {
                await insertToDB(buffer);
            }
            next();
        },
        async final(next) {
            // insert last batch?
            if( buffer.length ) {
                await insertToDB(buffer);
                buffer = [];
            }
            next();
        }

    });
}

async consumer() {
    await pipeline(...largePipeline, getStream());
    closeAll();
}

我还发现 writev() 函数是编写缓冲块的最佳方法,而不是在 Writable 流中实现您自己的缓冲区:

getStream() {
    return new Writable({
        objectMode: true,
        highWaterMark: BUFFER_SIZE,
        async writev(chunks, next) {
            await insertToDB( chunks.map( chunk => chunk.chunk ) );
            next();
        }
    });
}

它利用 highWaterMark 配置设置每次发送给你的块对象的数量,这允许 Node 更好地控制整个流管道的背压并简化你的 Writable 设计.