如何正确清除节点中可读流的内部缓冲区?
How to properly clear the internal buffer of a readable stream in node?
我正在处理我的自定义转换流。由于我手动处理缓冲和缓存,因此我需要确保在重新传输时不会自动发送任何块。为此,我应该在尝试调用 .pipe()
.
之前 clear/reset 内部缓冲区(写入队列)
由于我的流逻辑很复杂,我做了这个简化的例子来帮助你更好地理解我要解决的问题:
const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))
;(async () => {
const readable = new Readable()
readable._read = () => {}
const writable = new Writable()
writable._write = (chunk, _encoding, callback) => {
console.log(chunk.toString())
callback()
}
readable.pipe(writable)
await wait(100) // This small delay replaces listening to the pipe/unpipe events
readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
console.log('unpipe')
readable.unpipe(writable)
await wait(100)
readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
// readable.clearInternalBufferOrWhatever() // ???
console.log('pipe')
readable.pipe(writable)
await wait(100)
readable.push('ccccccccccccccccccccccccccccccc')
})()
// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa (Expected ✔)
// unpipe
// pipe
// bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb (Unexpected ❌)
// ccccccccccccccccccccccccccccccc (Expected ✔)
如您所见,可写流已成功接收到第二个块 ('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
),这正是我要避免的。有什么想法吗?
我想通了!流有一个名为 _readableState
的内部 属性,它包含内部缓冲区。只需两行代码就可以清除它:
readable._readableState.buffer.clear()
readable._readableState.length = 0
请注意,_readableState
并不安全,因为它是内部的且未记录,并且随时可能发生重大更改。但是,此解决方案已确认适用于节点 v8.10.0 和 v14.17.2。
这是一个完整的工作示例:
const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))
;(async () => {
const readable = new Readable()
readable._read = () => {}
const writable = new Writable()
writable._write = (chunk, _encoding, callback) => {
console.log(chunk.toString())
callback()
}
readable.pipe(writable)
await wait(100)
readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
console.log('unpipe')
readable.unpipe(writable)
await wait(100)
readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
readable._readableState.buffer.clear()
readable._readableState.length = 0
console.log('pipe')
readable.pipe(writable)
await wait(100)
readable.push('ccccccccccccccccccccccccccccccc')
})()
// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
// unpipe
// pipe
// ccccccccccccccccccccccccccccccc
我正在处理我的自定义转换流。由于我手动处理缓冲和缓存,因此我需要确保在重新传输时不会自动发送任何块。为此,我应该在尝试调用 .pipe()
.
由于我的流逻辑很复杂,我做了这个简化的例子来帮助你更好地理解我要解决的问题:
const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))
;(async () => {
const readable = new Readable()
readable._read = () => {}
const writable = new Writable()
writable._write = (chunk, _encoding, callback) => {
console.log(chunk.toString())
callback()
}
readable.pipe(writable)
await wait(100) // This small delay replaces listening to the pipe/unpipe events
readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
console.log('unpipe')
readable.unpipe(writable)
await wait(100)
readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
// readable.clearInternalBufferOrWhatever() // ???
console.log('pipe')
readable.pipe(writable)
await wait(100)
readable.push('ccccccccccccccccccccccccccccccc')
})()
// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa (Expected ✔)
// unpipe
// pipe
// bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb (Unexpected ❌)
// ccccccccccccccccccccccccccccccc (Expected ✔)
如您所见,可写流已成功接收到第二个块 ('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
),这正是我要避免的。有什么想法吗?
我想通了!流有一个名为 _readableState
的内部 属性,它包含内部缓冲区。只需两行代码就可以清除它:
readable._readableState.buffer.clear()
readable._readableState.length = 0
请注意,_readableState
并不安全,因为它是内部的且未记录,并且随时可能发生重大更改。但是,此解决方案已确认适用于节点 v8.10.0 和 v14.17.2。
这是一个完整的工作示例:
const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))
;(async () => {
const readable = new Readable()
readable._read = () => {}
const writable = new Writable()
writable._write = (chunk, _encoding, callback) => {
console.log(chunk.toString())
callback()
}
readable.pipe(writable)
await wait(100)
readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
console.log('unpipe')
readable.unpipe(writable)
await wait(100)
readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
readable._readableState.buffer.clear()
readable._readableState.length = 0
console.log('pipe')
readable.pipe(writable)
await wait(100)
readable.push('ccccccccccccccccccccccccccccccc')
})()
// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
// unpipe
// pipe
// ccccccccccccccccccccccccccccccc