使用 nodejs 管道流下载大文件会导致大量内存使用和 OOM 错误

Downloading large files using nodejs piped stream causes huge memory usage and OOM Error

我正在使用节点 js 从服务器下载大文件 (300MB) 并将响应通过管道传输到文件写入流。据我了解 nodejs 中的管道,数据流由节点管理,我不必考虑排水和其他事件。我面临的问题是,我的应用程序 运行 所在的 docker 的内存使用量增加与正在下载的文件相同(即文件似乎正在保存在内存中)。即使我删除 docker 中的文件,这种内存使用情况仍然存在。我附上了用于创建请求和管道的代码,供参考。代码 运行 很好,但会导致性能问题,例如 memory/CPU 大量使用并因 OOM 错误而崩溃。我无法理解我做错了什么。

let req = request({
      url: firmwareURL,
      maxAttempts: 5,
      retryDelay: 5000,
      retryStrategy: request.RetryStrategies.HTTPOrNetworkError});

    // 1. Perform server request
    req.on('response', (res) => {
      console.log(methodName, 'Download response statusCode:', res.statusCode);
      if (res.statusCode === 200) {
        abortOperation = false;
        isStarted = "yes";
        // 1.1 Create local file stream if the file is found on url and WaterMark paramter, for bigger chunk
        // basepath + basefirmware folder + firmware name + file extension
        fileStoragePath = `${firmwareDirectory}/${ip}`;
              console.log("filestoragepath is",fileStoragePath);
        fileName = `${firmwareVersion}.${firmwareURL.split(".").pop()}`;
        // temporary store the file
        tempFile = `${fileStoragePath}/${fileName}`;
              console.log("tempfile is",tempFile);
        writestream = fs.createWriteStream(tempFile, {
          highWaterMark: Math.pow(2,20 )
        }); // for 1mb buffer,can be increased
        writestream.on('error', function (err) {
          // on error
          console.log('Error while creating a file write stream' + err);
          abortOperation = true;
          isStarted = "no";
          _deleteProgressPointer(ip);
        });
        // 1.2 Get content length of the current response
        size = parseInt(res.headers['content-length'], 10);
        console.log(methodName, 'File size is:', size);
        req.pipe(writestream);
      } else {
        // 1.3 Ignore next request events on failure
        console.log(methodName, 'File not found on server. res.statusCode:', res.statusCode);
        abortOperation = true;
        isStarted = "no";
        _deleteProgressPointer(ip);
      }
    });
    // 3. In case of error ignore next request events
    req.on('error', (error) => {
      console.log(methodName, 'File not found on server:', error);
      abortOperation = true;
      isStarted = "no";
      _deleteProgressPointer(ip);
    });
    // 4. After stream is received close the connection
    req.on('end', () => {
      if (!abortOperation) {
        if (null !== writestream) {
          writestream.end();
          writestream.on('finish', function () {
            console.log(methodName, `File successfully downloaded for device ${ip} of firmware version ${firmwareVersion}`);
            try {

              // file extraction/storage operation
              // further check whether the file extension is valid or not
              if (ALLOWED_EXTENSION.includes(firmwareURL.split(".").pop())) {
                try {
                  //req.unpipe(writestream);
                  fileio.removeFile(tempFile); //deleting downloaded file to avoid storage issues
                  });
                  console.log("upgrade ended");
                  return upgradeOp;
                } catch (error) {
                  console.log(`Error while renamining file: ${tempFile}`);
                }
              } else {
                console.log(methodName, ` Not an valid file extension: ${tempFile}`);
                fileio.removeFile(tempFile);
                console.log(methodName, ` Invalid: ${tempFile} removed`);
              }
              // delete the progress pointer
              _deleteProgressPointer(ip);

            } catch (error) {
              // delete the progress pointer
              _deleteProgressPointer(ip);
              console.log(methodName, `Error during read/write operation :${error}`);
            }
          });
        }


问题是您正在使用 requestretry 包,它并不真正支持流式传输。它确实 always call request with a callback and will provide a promise that is resolved with the full response. The request library will read the entire response body when such a callback is provided,这确实在内存中缓冲了完整的响应。这不是你想要的。

我没有看到使用 requestretry 进行流式传输的方法,所以你应该直接使用 request 包(或者,考虑到它的弃用,它的后继库之一)并自己处理重试逻辑。