异步转换流不能并行工作吗?
Can asynchronous transform streams not work in parallel?
我正在从 NodeJS 中的流中读取数据,然后使用转换流中的异步函数处理该数据。我希望此转换流并行启动对异步函数的多个调用,但它似乎一次执行一个。
为了说明我的期望,我在下面编写了一个小程序,它生成从 0
到 limit - 1
的数字,然后将其传递给一个转换流,该转换流会以较小的延迟递增每个数字。如果你 运行 下面的程序,数字 1 到 20 将按顺序记录,所有的都有一个小的延迟。
由于默认 highWaterMark
是 16,我希望它们以 16 + 4 的块记录。是否有可能获得我想要的行为,如果可以,如何实现?
即读取流将非常快速地生成数据,转换速度较慢,但应该接收到高水位线,然后等待其数据处理完毕,然后从读取流中请求更多。
const stream = require('stream')
const limit = 20
let index = 0
const numberStream = new stream.Readable({
objectMode: true,
read (amount) {
const innerLimit = Math.min(index + amount, limit)
while (index < innerLimit) {
this.push(index++)
}
if (index === limit) {
this.push(null)
}
},
})
const delayedIncStream = new stream.Transform({
objectMode: true,
transform (item, _, cb) {
setTimeout(() => cb(null, item + 1), 100)
},
})
const resultStream = numberStream.pipe(delayedIncStream)
resultStream.on('data', console.log)
答案是否,如文档本节最后一部分所述:https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback
transform._transform() is never called in parallel; streams implement a queue mechanism, and to receive the next chunk, callback must be called, either synchronously or asynchronously.
您可以使用 nodejs 包 parallel-transform-stream 来实现这一点,同时保留转换数据的顺序。
您的示例可以重写如下,以并行转换所有数字:
const stream = require('stream')
const ParallelTransform = require('parallel-transform-stream').default
const limit = 20
let index = 0
const numberStream = new stream.Readable({
objectMode: true,
read (amount) {
const innerLimit = Math.min(index + amount, limit)
while (index < innerLimit) {
this.push(index++)
}
if (index === limit) {
this.push(null)
}
},
})
const delayedIncStream = new (ParallelTransform.create((item, _, cb) => {
setTimeout(() => cb(null, item + 1), 100)
}))({
objectMode: true,
maxParallel: 20
})
const resultStream = numberStream.pipe(delayedIncStream)
resultStream.on('data', console.log)
我正在从 NodeJS 中的流中读取数据,然后使用转换流中的异步函数处理该数据。我希望此转换流并行启动对异步函数的多个调用,但它似乎一次执行一个。
为了说明我的期望,我在下面编写了一个小程序,它生成从 0
到 limit - 1
的数字,然后将其传递给一个转换流,该转换流会以较小的延迟递增每个数字。如果你 运行 下面的程序,数字 1 到 20 将按顺序记录,所有的都有一个小的延迟。
由于默认 highWaterMark
是 16,我希望它们以 16 + 4 的块记录。是否有可能获得我想要的行为,如果可以,如何实现?
即读取流将非常快速地生成数据,转换速度较慢,但应该接收到高水位线,然后等待其数据处理完毕,然后从读取流中请求更多。
const stream = require('stream')
const limit = 20
let index = 0
const numberStream = new stream.Readable({
objectMode: true,
read (amount) {
const innerLimit = Math.min(index + amount, limit)
while (index < innerLimit) {
this.push(index++)
}
if (index === limit) {
this.push(null)
}
},
})
const delayedIncStream = new stream.Transform({
objectMode: true,
transform (item, _, cb) {
setTimeout(() => cb(null, item + 1), 100)
},
})
const resultStream = numberStream.pipe(delayedIncStream)
resultStream.on('data', console.log)
答案是否,如文档本节最后一部分所述:https://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback
transform._transform() is never called in parallel; streams implement a queue mechanism, and to receive the next chunk, callback must be called, either synchronously or asynchronously.
您可以使用 nodejs 包 parallel-transform-stream 来实现这一点,同时保留转换数据的顺序。
您的示例可以重写如下,以并行转换所有数字:
const stream = require('stream')
const ParallelTransform = require('parallel-transform-stream').default
const limit = 20
let index = 0
const numberStream = new stream.Readable({
objectMode: true,
read (amount) {
const innerLimit = Math.min(index + amount, limit)
while (index < innerLimit) {
this.push(index++)
}
if (index === limit) {
this.push(null)
}
},
})
const delayedIncStream = new (ParallelTransform.create((item, _, cb) => {
setTimeout(() => cb(null, item + 1), 100)
}))({
objectMode: true,
maxParallel: 20
})
const resultStream = numberStream.pipe(delayedIncStream)
resultStream.on('data', console.log)