错误 [ERR_STREAM_PREMATURE_CLOSE]:节点管道流中的过早关闭

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close in Node Pipeline stream

我正在使用 Node 的 stream.pipeline 功能将一些数据上传到 S3。我正在实施的基本想法是从请求中提取文件并将它们写入 S3。我有一个 pipeline 可以提取 zip 文件并将它们成功写入 S3。但是,我希望我的第二个 pipeline 发出相同的请求,但解压缩并将解压缩的文件写入 S3。管道代码如下所示:

pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))

s3Stream 函数如下所示:

function s3Stream(file) {
    const pass = new stream.PassThrough()
    s3Store.upload(file, pass)
    return pass
}

第一个 pipeline 运行良好,目前在生产中运行良好。但是,在添加第二个管道时,出现以下错误:

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)

任何想法可能导致此问题或解决此问题的解决方案将不胜感激!

TL;DR

使用管道时,您接受完全使用可读流,您不希望任何东西在可读结束之前停止。

深潜

经过一段时间处理这些恶作剧后,这里有一些更有用的信息。

import stream from 'stream'

const s1 = new stream.PassThrough()
const s2 = new stream.PassThrough()
const s3 = new stream.PassThrough()

s1.on('end', () => console.log('end 1'))
s2.on('end', () => console.log('end 2'))
s3.on('end', () => console.log('end 3'))
s1.on('close', () => console.log('close 1'))
s2.on('close', () => console.log('close 2'))
s3.on('close', () => console.log('close 3'))

stream.pipeline(
    s1,
    s2,
    s3,
    async s => { for await (_ of s) { } },
    err => console.log('end', err)
)

现在如果我调用 s2.end() 它将关闭所有父项

end 2
close 2
end 3
close 3

pipeline is the equivalent of s3(s2(s1)))

但是如果我调用 s2.destroy() 它会打印并销毁所有内容,这是你的问题,流在它正常结束之前被销毁,要么是错误,要么是 [=37= 中的 return/break/throws ]

close 2
end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
    at PassThrough.emit (events.js:327:22)
    at emitCloseNT (internal/streams/destroy.js:81:10)
    at processTicksAndRejections (internal/process/task_queues.js:83:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
close 1
close 3

你不能让其中一个流无法捕获它们的错误

stream.pipeline() leaves dangling event listeners on the streams after theallback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.

node source (14.4)

  const onclose = () => {
    if (readable && !readableEnded) {
      if (!isReadableEnded(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    if (writable && !writableFinished) {
      if (!isWritableFinished(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    callback.call(stream);
  };