文件何时完成流式传输到 AWS S3 上传 api 的事件侦听器?
Event listener for when a file has finished streaming to AWS S3 upload api?
我正在 Google 驱动器和 AWS S3 之间创建文件备份。我通过使用 Google Get API 下载文件并将数据传输到 AWS S3 来创建可读流承诺。
由于我有很多文件,每个承诺都被添加到队列中,只有新的承诺在它解决时才进入。
我正在努力仅在文件已完成上传到 AWS S3 时而不是在文件已下载时才解决承诺?
我认为使用 .on('finish', () => {resolve()})
应该可以做到这一点,但它似乎不起作用。
这是我的代码示例:
// download stream of NON gdocs files and pipe to destination
const getGFileContent = async (fileObj) => {
let fileExt = fileObj.path.join('/').concat('/',fileObj.name)
return drive.files.get({fileId: fileObj.id, mimeType: fileObj.mimeType, alt: 'media'}, {responseType: 'stream'})
.then(res => {
return new Promise((resolve, reject) => {
res.data
.pipe(uploadS3(fileExt))
.on('end', () => {console.log(`Done downloading file: ${fileExt}`)})
.on('finish', () => {resolve(console.log(`File Backup Complete: ${fileExt}`))})
.on('error', err => {reject(console.error(`Error downloading file: ${err}`))})
})
// upload a file to AWS S3 by passing the file stream from getGFileContent into the 'body' parameter of the upload
const uploadS3 = (filePath) => {
let pass = new stream.PassThrough()
let params = {
Bucket: awsBucketName, // bucket-name
Key: filePath, // file will be saved as bucket-name/[uniquekey.csv]
Body: pass // file data passed through stream
}
new aws.S3().upload(params).promise()
.then(() => console.log(`Successfully uploaded to S3: ${filePath}`))
.catch( err => console.log(`Error, unable to upload to S3: ${err}`))
return pass
}
首先想到的是 uploadS3
函数 async
和 await
以便在 return 传递流之前完成上传。 但这行不通。然后它会 return 一个 Promise 并且 .pipe() 只接受一个流对象。
而不是 ,您可以重构您的代码,以便 getGFileContent
将 return 一个可读的流承诺。
然后,uploadS3
接受可读流作为参数和 return s3 上传承诺。
最后,添加一个异步 backupFile
函数,这将 await
GDrive steam 和上传承诺在继续之前得到解决。这也将使功能保持整洁干净,每个功能都有自己的职责。
示例代码:
const AWS = require('aws-sdk');
const fs = require('fs');
const s3 = new AWS.S3();
AWS.config.update({
accessKeyId: '----',
secretAccessKey: '----',
});
const backupFile = async (file) => {
const fileStream = await getGFileStream(file);
try {
await uploadStreamToS3(fileStream);
console.log(`S3 Backup of ${fileStream.path} completed`)
} catch (err) {
console.log(`error during file upload ${err}`);
}
}
const getGFileStream = async (fileObj) => {
// TODO: logic to find and get the file. Returns a readableStream promise
const fileStream = fs.createReadStream('./largeFile.zip');
console.log('File ${...} read from Google Drive');
return fileStream;
}
const uploadStreamToS3 = (fileStream) => {
const params = {Bucket: 'test-bucket', Key: 'key', Body: fileStream}
console.log(`Starting to upload ${fileStream.path} to S3`);
return s3.upload(params).promise();
}
backupFile({id: 'mockTestFile'});
我正在 Google 驱动器和 AWS S3 之间创建文件备份。我通过使用 Google Get API 下载文件并将数据传输到 AWS S3 来创建可读流承诺。 由于我有很多文件,每个承诺都被添加到队列中,只有新的承诺在它解决时才进入。
我正在努力仅在文件已完成上传到 AWS S3 时而不是在文件已下载时才解决承诺?
我认为使用 .on('finish', () => {resolve()})
应该可以做到这一点,但它似乎不起作用。
这是我的代码示例:
// download stream of NON gdocs files and pipe to destination
const getGFileContent = async (fileObj) => {
let fileExt = fileObj.path.join('/').concat('/',fileObj.name)
return drive.files.get({fileId: fileObj.id, mimeType: fileObj.mimeType, alt: 'media'}, {responseType: 'stream'})
.then(res => {
return new Promise((resolve, reject) => {
res.data
.pipe(uploadS3(fileExt))
.on('end', () => {console.log(`Done downloading file: ${fileExt}`)})
.on('finish', () => {resolve(console.log(`File Backup Complete: ${fileExt}`))})
.on('error', err => {reject(console.error(`Error downloading file: ${err}`))})
})
// upload a file to AWS S3 by passing the file stream from getGFileContent into the 'body' parameter of the upload
const uploadS3 = (filePath) => {
let pass = new stream.PassThrough()
let params = {
Bucket: awsBucketName, // bucket-name
Key: filePath, // file will be saved as bucket-name/[uniquekey.csv]
Body: pass // file data passed through stream
}
new aws.S3().upload(params).promise()
.then(() => console.log(`Successfully uploaded to S3: ${filePath}`))
.catch( err => console.log(`Error, unable to upload to S3: ${err}`))
return pass
}
首先想到的是 uploadS3
函数 async
和 await
以便在 return 传递流之前完成上传。 但这行不通。然后它会 return 一个 Promise 并且 .pipe() 只接受一个流对象。
而不是 ,您可以重构您的代码,以便 getGFileContent
将 return 一个可读的流承诺。
然后,uploadS3
接受可读流作为参数和 return s3 上传承诺。
最后,添加一个异步 backupFile
函数,这将 await
GDrive steam 和上传承诺在继续之前得到解决。这也将使功能保持整洁干净,每个功能都有自己的职责。
示例代码:
const AWS = require('aws-sdk');
const fs = require('fs');
const s3 = new AWS.S3();
AWS.config.update({
accessKeyId: '----',
secretAccessKey: '----',
});
const backupFile = async (file) => {
const fileStream = await getGFileStream(file);
try {
await uploadStreamToS3(fileStream);
console.log(`S3 Backup of ${fileStream.path} completed`)
} catch (err) {
console.log(`error during file upload ${err}`);
}
}
const getGFileStream = async (fileObj) => {
// TODO: logic to find and get the file. Returns a readableStream promise
const fileStream = fs.createReadStream('./largeFile.zip');
console.log('File ${...} read from Google Drive');
return fileStream;
}
const uploadStreamToS3 = (fileStream) => {
const params = {Bucket: 'test-bucket', Key: 'key', Body: fileStream}
console.log(`Starting to upload ${fileStream.path} to S3`);
return s3.upload(params).promise();
}
backupFile({id: 'mockTestFile'});