文件何时完成流式传输到 AWS S3 上传 api 的事件侦听器?

Event listener for when a file has finished streaming to AWS S3 upload api?

我正在 Google 驱动器和 AWS S3 之间创建文件备份。我通过使用 Google Get API 下载文件并将数据传输到 AWS S3 来创建可读流承诺。 由于我有很多文件,每个承诺都被添加到队列中,只有新的承诺在它解决时才进入。

我正在努力仅在文件已完成上传到 AWS S3 时而不是在文件已下载时才解决承诺?

我认为使用 .on('finish', () => {resolve()}) 应该可以做到这一点,但它似乎不起作用。

这是我的代码示例:

// download stream of NON gdocs files and pipe to destination
const getGFileContent = async (fileObj) => {  
  let fileExt = fileObj.path.join('/').concat('/',fileObj.name)

  return drive.files.get({fileId: fileObj.id, mimeType: fileObj.mimeType, alt: 'media'}, {responseType: 'stream'})
    .then(res => {
      return new Promise((resolve, reject) => {
        res.data
          .pipe(uploadS3(fileExt))
          .on('end', () => {console.log(`Done downloading file: ${fileExt}`)})
          .on('finish', () => {resolve(console.log(`File Backup Complete: ${fileExt}`))})
          .on('error', err => {reject(console.error(`Error downloading file: ${err}`))})
      })

// upload a file to AWS S3 by passing the file stream from getGFileContent into the 'body' parameter of the upload
const uploadS3 = (filePath) => {
  let pass = new stream.PassThrough()
  let params = {
    Bucket: awsBucketName, // bucket-name
    Key: filePath, // file will be saved as bucket-name/[uniquekey.csv]
    Body: pass  // file data passed through stream
  } 
  new aws.S3().upload(params).promise()
    .then(() => console.log(`Successfully uploaded to S3: ${filePath}`))
    .catch( err => console.log(`Error, unable to upload to S3: ${err}`))
  return pass
}

首先想到的是 uploadS3 函数 asyncawait 以便在 return 传递流之前完成上传。 但这行不通。然后它会 return 一个 Promise 并且 .pipe() 只接受一个流对象。

而不是 ,您可以重构您的代码,以便 getGFileContent 将 return 一个可读的流承诺。

然后,uploadS3 接受可读流作为参数和 return s3 上传承诺。

最后,添加一个异步 backupFile 函数,这将 await GDrive steam 和上传承诺在继续之前得到解决。这也将使功能保持整洁干净,每个功能都有自己的职责。

示例代码:

const AWS = require('aws-sdk');
const fs = require('fs');

const s3 = new AWS.S3();

AWS.config.update({
    accessKeyId: '----',
    secretAccessKey: '----',
});

const backupFile = async (file) => {
    const fileStream = await getGFileStream(file);
    try {
        await uploadStreamToS3(fileStream);
        console.log(`S3 Backup of ${fileStream.path} completed`)
    } catch (err) {
        console.log(`error during file upload ${err}`);
    }
}

const getGFileStream = async (fileObj) => {
    // TODO: logic to find and get the file. Returns a readableStream promise 
    const fileStream = fs.createReadStream('./largeFile.zip');
    console.log('File ${...} read from Google Drive');
    return fileStream;
}

const uploadStreamToS3 = (fileStream) => {
    const params = {Bucket: 'test-bucket', Key: 'key', Body: fileStream}
    console.log(`Starting to upload ${fileStream.path} to S3`);
    return s3.upload(params).promise();
}

backupFile({id: 'mockTestFile'});