NodeJS Streams 行为管道与承诺
NodeJS Streams behaviour pipeline vs promise
我正在执行一些代码来获取图像,使用 sharp 库将其转换为 png 和 jpg 两种格式,并且 return 这两种流稍后将上传到 S3 存储桶。
我提出了两种不同的解决方案,一种使用 Promise,另一种使用 stream.pipeline。
但是,由于某种原因,管道版本 运行ning 比承诺的要慢得多。
这是重现该行为的代码(运行 节点 14)
const sharp = require('sharp')
const fs = require('fs')
const util = require('util')
const stream = require('stream')
const pipeline = util.promisify(stream.pipeline);
console.time('resize')
const resizeJobPipeline = async (readableStream) => {
const sharpStream = sharp({
failOnError: false
}).resize({width: 800, height: 800, fit: 'inside'})
// using PassThrough here because in the final code will have to pass this stream to s3 upload
const memoryPng = new stream.PassThrough()
const memoryJpg = new stream.PassThrough()
// Have to await each pipeline sepparately,
// if wrap them in a Promise.all, then the images don't get fully processed/become corrupted
await pipeline(readableStream, sharpStream.clone().png(), memoryPng)
await pipeline(readableStream, sharpStream.clone().jpeg(), memoryJpg)
return [memoryPng, memoryJpg]
}
const resizeJobPromise = async (readableStream) => {
const sharpStream = sharp({
failOnError: false
}).resize({width: 800, height: 800, fit: 'inside'})
const promises = []
promises.push(sharpStream.clone().png().pipe(new stream.PassThrough()))
promises.push(sharpStream.clone().jpeg().pipe(new stream.PassThrough()))
readableStream.pipe(sharpStream)
return await Promise.all(promises)
}
const readStream = fs.createReadStream('big_img.jpg')
// resizeJobPromise(readStream).then(res => {
// res[0].pipe(fs.createWriteStream('resized.png'))
// res[1].pipe(fs.createWriteStream('resized.jpg'))
// console.timeEnd('resize')
// }).catch(err => {
// console.log(err)
// })
resizeJobPipeline(readStream).then(res => {
res[0].pipe(fs.createWriteStream('resized.png'))
res[1].pipe(fs.createWriteStream('resized.jpg'))
console.timeEnd('resize')
}).catch(err => {
console.log(err)
})
如果我 运行 resizeJobPipeline 版本,使用大约 20mb 的图像,我的平均执行时间约为 500 毫秒
但是,如果评论这个版本和 运行 resizeJobPromise 版本,使用相同的图像,我得到的平均时间只有 ~7 毫秒!
通过依次等待两个管道,我希望时间可能会增加一倍,但不是 100 倍。
我读到管道版本使用起来更安全,因为它会自动处理可读错误并关闭可写流以防止内存泄漏,而在 promise 版本上我必须手动处理这些错误。
我在 promise 版本中做错了吗?代码背后可能发生了什么,使其具有如此高的性能?
Is there something wrong I'm doing in the promise version?
是的,您没有测量流的执行时间。注意
promises.push(sharpStream.clone().png().pipe(new stream.PassThrough()))
promises.push(sharpStream.clone().jpeg().pipe(new stream.PassThrough()))
只是将流对象推入一个数组,将这些对象传递给 Promise.all
不会等待流完成,而是立即用流对象完成。您也可以省略此函数中的 promise 内容。
您应该做的是 pipeline
将流 file/s3 写入流:
const sharp = require('sharp')
const fs = require('fs')
const util = require('util')
const stream = require('stream')
const pipeline = util.promisify(stream.pipeline)
function resizeJob() {
const sharpStream = sharp({
failOnError: false
}).resize({width: 800, height: 800, fit: 'inside'})
const source = fs.createReadStream('big_img.jpg')
// using writeStream here, the final code will do s3 upload instead
const pngTarget = fs.createWriteStream('resized.png')
const jpgTarget = fs.createWriteStream('resized.jpg')
const promises = [
pipeline(readableStream, sharpStream), // don't do this piping twice!
pipeline(sharpStream.clone().png(), memoryPng),
pipeline(sharpStream.clone().jpeg(), memoryJpg),
]
return Promise.all(promises)
}
console.time('resize')
resizeJob().catch(err => {
console.log(err)
}).then(() => {
console.timeEnd('resize')
})
我正在执行一些代码来获取图像,使用 sharp 库将其转换为 png 和 jpg 两种格式,并且 return 这两种流稍后将上传到 S3 存储桶。
我提出了两种不同的解决方案,一种使用 Promise,另一种使用 stream.pipeline。 但是,由于某种原因,管道版本 运行ning 比承诺的要慢得多。
这是重现该行为的代码(运行 节点 14)
const sharp = require('sharp')
const fs = require('fs')
const util = require('util')
const stream = require('stream')
const pipeline = util.promisify(stream.pipeline);
console.time('resize')
const resizeJobPipeline = async (readableStream) => {
const sharpStream = sharp({
failOnError: false
}).resize({width: 800, height: 800, fit: 'inside'})
// using PassThrough here because in the final code will have to pass this stream to s3 upload
const memoryPng = new stream.PassThrough()
const memoryJpg = new stream.PassThrough()
// Have to await each pipeline sepparately,
// if wrap them in a Promise.all, then the images don't get fully processed/become corrupted
await pipeline(readableStream, sharpStream.clone().png(), memoryPng)
await pipeline(readableStream, sharpStream.clone().jpeg(), memoryJpg)
return [memoryPng, memoryJpg]
}
const resizeJobPromise = async (readableStream) => {
const sharpStream = sharp({
failOnError: false
}).resize({width: 800, height: 800, fit: 'inside'})
const promises = []
promises.push(sharpStream.clone().png().pipe(new stream.PassThrough()))
promises.push(sharpStream.clone().jpeg().pipe(new stream.PassThrough()))
readableStream.pipe(sharpStream)
return await Promise.all(promises)
}
const readStream = fs.createReadStream('big_img.jpg')
// resizeJobPromise(readStream).then(res => {
// res[0].pipe(fs.createWriteStream('resized.png'))
// res[1].pipe(fs.createWriteStream('resized.jpg'))
// console.timeEnd('resize')
// }).catch(err => {
// console.log(err)
// })
resizeJobPipeline(readStream).then(res => {
res[0].pipe(fs.createWriteStream('resized.png'))
res[1].pipe(fs.createWriteStream('resized.jpg'))
console.timeEnd('resize')
}).catch(err => {
console.log(err)
})
如果我 运行 resizeJobPipeline 版本,使用大约 20mb 的图像,我的平均执行时间约为 500 毫秒
但是,如果评论这个版本和 运行 resizeJobPromise 版本,使用相同的图像,我得到的平均时间只有 ~7 毫秒!
通过依次等待两个管道,我希望时间可能会增加一倍,但不是 100 倍。
我读到管道版本使用起来更安全,因为它会自动处理可读错误并关闭可写流以防止内存泄漏,而在 promise 版本上我必须手动处理这些错误。
我在 promise 版本中做错了吗?代码背后可能发生了什么,使其具有如此高的性能?
Is there something wrong I'm doing in the promise version?
是的,您没有测量流的执行时间。注意
promises.push(sharpStream.clone().png().pipe(new stream.PassThrough()))
promises.push(sharpStream.clone().jpeg().pipe(new stream.PassThrough()))
只是将流对象推入一个数组,将这些对象传递给 Promise.all
不会等待流完成,而是立即用流对象完成。您也可以省略此函数中的 promise 内容。
您应该做的是 pipeline
将流 file/s3 写入流:
const sharp = require('sharp')
const fs = require('fs')
const util = require('util')
const stream = require('stream')
const pipeline = util.promisify(stream.pipeline)
function resizeJob() {
const sharpStream = sharp({
failOnError: false
}).resize({width: 800, height: 800, fit: 'inside'})
const source = fs.createReadStream('big_img.jpg')
// using writeStream here, the final code will do s3 upload instead
const pngTarget = fs.createWriteStream('resized.png')
const jpgTarget = fs.createWriteStream('resized.jpg')
const promises = [
pipeline(readableStream, sharpStream), // don't do this piping twice!
pipeline(sharpStream.clone().png(), memoryPng),
pipeline(sharpStream.clone().jpeg(), memoryJpg),
]
return Promise.all(promises)
}
console.time('resize')
resizeJob().catch(err => {
console.log(err)
}).then(() => {
console.timeEnd('resize')
})