如何正确清除节点中可读流的内部缓冲区?

How to properly clear the internal buffer of a readable stream in node?

我正在处理我的自定义转换流。由于我手动处理缓冲和缓存,因此我需要确保在重新传输时不会自动发送任何块。为此,我应该在尝试调用 .pipe().

之前 clear/reset 内部缓冲区(写入队列)

由于我的流逻辑很复杂,我做了这个简化的例子来帮助你更好地理解我要解决的问题:

const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))

;(async () => {
    const readable = new Readable()
    readable._read = () => {}

    const writable = new Writable()
    writable._write = (chunk, _encoding, callback) => {
        console.log(chunk.toString())
        callback()
    }

    readable.pipe(writable)
    await wait(100) // This small delay replaces listening to the pipe/unpipe events 

    readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')

    console.log('unpipe')
    readable.unpipe(writable)
    await wait(100)

    readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')

    // readable.clearInternalBufferOrWhatever() // ???

    console.log('pipe')
    readable.pipe(writable)
    await wait(100)

    readable.push('ccccccccccccccccccccccccccccccc')
})()

// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa (Expected ✔)
//    unpipe
//    pipe
//    bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb (Unexpected ❌)
//    ccccccccccccccccccccccccccccccc (Expected ✔)

如您所见,可写流已成功接收到第二个块 ('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'),这正是我要避免的。有什么想法吗?

我想通了!流有一个名为 _readableState 的内部 属性,它包含内部缓冲区。只需两行代码就可以清除它:

readable._readableState.buffer.clear()
readable._readableState.length = 0

请注意,_readableState 并不安全,因为它是内部的且未记录,并且随时可能发生重大更改。但是,此解决方案已确认适用于节点 v8.10.0 和 v14.17.2。

这是一个完整的工作示例:

const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))

;(async () => {
    const readable = new Readable()
    readable._read = () => {}

    const writable = new Writable()
    writable._write = (chunk, _encoding, callback) => {
        console.log(chunk.toString())
        callback()
    }

    readable.pipe(writable)
    await wait(100)
    readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')

    console.log('unpipe')
    readable.unpipe(writable)
    await wait(100)

    readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')

    readable._readableState.buffer.clear()
    readable._readableState.length = 0

    console.log('pipe')
    readable.pipe(writable)
    await wait(100)

    readable.push('ccccccccccccccccccccccccccccccc')
})()

// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
//    unpipe
//    pipe
//    ccccccccccccccccccccccccccccccc