通过管道将流传输到 s3.upload()
Pipe a stream to s3.upload()
我目前正在使用一个名为 s3-upload-stream 的 node.js 插件将非常大的文件流式传输到 Amazon S3。它使用多部分 API 并且在大多数情况下效果很好。
但是,这个模块显示出它的年龄,我已经不得不对其进行修改(作者也已弃用它)。今天我运行又遇到了亚马逊的一个issue,很想听听作者的推荐,开始使用官方的aws-sdk来完成我的上传。
但是。
官方 SDK 似乎不支持管道到 s3.upload()
。 s3.upload 的本质是您必须将可读流作为参数传递给 S3 构造函数。
我有大约 120 多个执行各种文件处理的用户代码模块,它们不知道输出的最终目的地。引擎将一个可管道化的可写输出流交给他们,然后他们通过管道传输给它。我不能给他们一个 AWS.S3
对象并要求他们在不向所有模块添加代码的情况下对其调用 upload()
。我使用 s3-upload-stream
的原因是因为它支持管道。
有没有办法制作 aws-sdk s3.upload()
我可以将流传输到的东西?
如果您知道流的大小,您可以使用 minio-js 像这样上传流:
s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
if (e) {
return console.log(e)
}
console.log("Successfully uploaded the stream")
})
用 node.js stream.PassThrough()
流包装 S3 upload()
函数。
这是一个例子:
inputStream
.pipe(uploadFromStream(s3));
function uploadFromStream(s3) {
var pass = new stream.PassThrough();
var params = {Bucket: BUCKET, Key: KEY, Body: pass};
s3.upload(params, function(err, data) {
console.log(err, data);
});
return pass;
}
如果它能帮助我成功从客户端流式传输到 s3 的任何人:
https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a
服务器端代码假设 req
是一个流 object,在我的例子中它是从客户端发送的,文件信息设置在 headers.
const fileUploadStream = (req, res) => {
//get "body" args from header
const { id, fn } = JSON.parse(req.get('body'));
const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
const params = {
Key,
Bucket: bucketName, //set somewhere
Body: req, //req is a stream
};
s3.upload(params, (err, data) => {
if (err) {
res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
} else {
res.send(Key);
}
});
};
是的,它打破了惯例,但如果你看一下要点,它比我发现的使用 multer、busboy 等的任何其他东西都干净得多...
+1 实用主义,感谢@SalehenRahman 的帮助。
在接受的答案中,函数在上传完成之前结束,因此是不正确的。下面的代码从可读流中正确传输。
async function uploadReadableStream(stream) {
const params = {Bucket: bucket, Key: key, Body: stream};
return s3.upload(params).promise();
}
async function upload() {
const readable = getSomeReadableStream();
const results = await uploadReadableStream(readable);
console.log('upload complete', results);
}
您还可以更进一步,使用 ManagedUpload
输出进度信息:
const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});
回答有点晚,希望它能对其他人有所帮助。您可以 return 可写流和 promise,这样您就可以在上传完成时获得响应数据。
const AWS = require('aws-sdk');
const stream = require('stream');
const uploadStream = ({ Bucket, Key }) => {
const s3 = new AWS.S3();
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
};
}
并且您可以使用如下函数:
const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');
const pipeline = readStream.pipe(writeStream);
现在您可以检查承诺:
promise.then(() => {
console.log('upload completed successfully');
}).catch((err) => {
console.log('upload failed.', err.message);
});
或使用async/await:
try {
await promise;
console.log('upload completed successfully');
} catch (error) {
console.log('upload failed.', error.message);
}
或者如stream.pipe()
returns stream.Writable,目的地(上面的writeStream变量),允许管道链,我们也可以使用它的事件:
pipeline.on('close', () => {
console.log('upload successful');
});
pipeline.on('error', (err) => {
console.log('upload failed', err.message)
});
Type Script 解决方案:
此示例使用:
import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";
和异步函数:
public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> {
const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
const passT = new stream.PassThrough();
return {
writeStream: passT,
promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
};
};
const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
fsExtra.createReadStream(filePath).pipe(writeStream); // NOTE: Addition You can compress to zip by .pipe(zlib.createGzip()).pipe(writeStream)
let output = true;
await promise.catch((reason)=> { output = false; console.log(reason);});
return output;
}
在某处调用此方法,例如:
let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
对于那些抱怨当他们使用 s3 api 上传功能并且零字节文件最终出现在 s3 上的人(@Radar155 和 @gabo)- 我也遇到了这个问题。
创建第二个 PassThrough 流,并将所有数据从第一个流传输到第二个流,并将对第二个流的引用传递给 s3。您可以通过几种不同的方式执行此操作 - 可能一种肮脏的方式是在第一个流上侦听 "data" 事件,然后将相同的数据写入第二个流 - "end" 也是如此事件 - 只需在第二个流上调用结束函数。我不知道这是否是 aws api、节点版本或其他问题中的错误 - 但它解决了我的问题。
它可能是这样的:
var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();
var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
destStream.write(chunk);
});
srcStream.on('end', function () {
dataStream.end();
});
我正在使用 KnexJS,但在使用他们的流媒体时遇到了问题 API。我终于修好了,希望以下内容对某人有所帮助。
const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();
knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());
const uploadResult = await s3
.upload({
Bucket: 'my-bucket',
Key: 'stream-test.txt',
Body: passThroughStream
})
.promise();
None 个答案对我有用,因为我想:
- 输入
s3.upload()
- 将
s3.upload()
的结果通过管道传输到另一个流中
接受的答案不支持后者。其他依赖于 promise api,这在使用流管道时工作起来很麻烦。
这是我对已接受答案的修改。
const s3 = new S3();
function writeToS3({Key, Bucket}) {
const Body = new stream.PassThrough();
s3.upload({
Body,
Key,
Bucket: process.env.adpBucket
})
.on('httpUploadProgress', progress => {
console.log('progress', progress);
})
.send((err, data) => {
if (err) {
Body.destroy(err);
} else {
console.log(`File uploaded and available at ${data.Location}`);
Body.destroy();
}
});
return Body;
}
const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});
pipeline.on('close', () => {
// upload finished, do something else
})
pipeline.on('error', () => {
// upload wasn't successful. Handle it
})
在上面最被接受的答案中要注意的是:
如果你使用管道,你需要 return 在函数中传递,
fs.createReadStream(<filePath>).pipe(anyUploadFunction())
function anyUploadFunction () {
let pass = new stream.PassThrough();
return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}
否则它会静静地移动到下一个而不会抛出错误,或者会抛出 TypeError: dest.on is not a function
的错误,具体取决于您编写函数的方式
按照其他答案并为 Node.js 使用最新的 AWS SDK,有一个更清晰和更简单的解决方案,因为 s3 upload() 函数接受流,使用 await 语法和 S3 的承诺:
var model = await s3Client.upload({
Bucket : bucket,
Key : key,
ContentType : yourContentType,
Body : fs.createReadStream(path-to-file)
}).promise();
如果您使用的是 AWS 节点 SDK v3,则有用于上传的专用模块 streams/blobs/buffers。
创建一个 new stream.PassThrough()
和 pipe
输入流,然后将 passthrough 实例传递给 body。
检查以下示例:
function upload(s3, inputStream) {
const pass = new PassThrough();
inputStream.pipe(pass);
return s3.upload(
{
Bucket: 'bucket name',
Key: 'unique file name',
Body: pass,
},
{
queueSize: 4, // default concurrency
},
).promise()
.then((data) => console.log(data))
.catch((error) => console.error(error));
}
我目前正在使用一个名为 s3-upload-stream 的 node.js 插件将非常大的文件流式传输到 Amazon S3。它使用多部分 API 并且在大多数情况下效果很好。
但是,这个模块显示出它的年龄,我已经不得不对其进行修改(作者也已弃用它)。今天我运行又遇到了亚马逊的一个issue,很想听听作者的推荐,开始使用官方的aws-sdk来完成我的上传。
但是。
官方 SDK 似乎不支持管道到 s3.upload()
。 s3.upload 的本质是您必须将可读流作为参数传递给 S3 构造函数。
我有大约 120 多个执行各种文件处理的用户代码模块,它们不知道输出的最终目的地。引擎将一个可管道化的可写输出流交给他们,然后他们通过管道传输给它。我不能给他们一个 AWS.S3
对象并要求他们在不向所有模块添加代码的情况下对其调用 upload()
。我使用 s3-upload-stream
的原因是因为它支持管道。
有没有办法制作 aws-sdk s3.upload()
我可以将流传输到的东西?
如果您知道流的大小,您可以使用 minio-js 像这样上传流:
s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
if (e) {
return console.log(e)
}
console.log("Successfully uploaded the stream")
})
用 node.js stream.PassThrough()
流包装 S3 upload()
函数。
这是一个例子:
inputStream
.pipe(uploadFromStream(s3));
function uploadFromStream(s3) {
var pass = new stream.PassThrough();
var params = {Bucket: BUCKET, Key: KEY, Body: pass};
s3.upload(params, function(err, data) {
console.log(err, data);
});
return pass;
}
如果它能帮助我成功从客户端流式传输到 s3 的任何人:
https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a
服务器端代码假设 req
是一个流 object,在我的例子中它是从客户端发送的,文件信息设置在 headers.
const fileUploadStream = (req, res) => {
//get "body" args from header
const { id, fn } = JSON.parse(req.get('body'));
const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
const params = {
Key,
Bucket: bucketName, //set somewhere
Body: req, //req is a stream
};
s3.upload(params, (err, data) => {
if (err) {
res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
} else {
res.send(Key);
}
});
};
是的,它打破了惯例,但如果你看一下要点,它比我发现的使用 multer、busboy 等的任何其他东西都干净得多...
+1 实用主义,感谢@SalehenRahman 的帮助。
在接受的答案中,函数在上传完成之前结束,因此是不正确的。下面的代码从可读流中正确传输。
async function uploadReadableStream(stream) {
const params = {Bucket: bucket, Key: key, Body: stream};
return s3.upload(params).promise();
}
async function upload() {
const readable = getSomeReadableStream();
const results = await uploadReadableStream(readable);
console.log('upload complete', results);
}
您还可以更进一步,使用 ManagedUpload
输出进度信息:
const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});
回答有点晚,希望它能对其他人有所帮助。您可以 return 可写流和 promise,这样您就可以在上传完成时获得响应数据。
const AWS = require('aws-sdk');
const stream = require('stream');
const uploadStream = ({ Bucket, Key }) => {
const s3 = new AWS.S3();
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
};
}
并且您可以使用如下函数:
const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');
const pipeline = readStream.pipe(writeStream);
现在您可以检查承诺:
promise.then(() => {
console.log('upload completed successfully');
}).catch((err) => {
console.log('upload failed.', err.message);
});
或使用async/await:
try {
await promise;
console.log('upload completed successfully');
} catch (error) {
console.log('upload failed.', error.message);
}
或者如stream.pipe()
returns stream.Writable,目的地(上面的writeStream变量),允许管道链,我们也可以使用它的事件:
pipeline.on('close', () => {
console.log('upload successful');
});
pipeline.on('error', (err) => {
console.log('upload failed', err.message)
});
Type Script 解决方案:
此示例使用:
import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";
和异步函数:
public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> {
const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
const passT = new stream.PassThrough();
return {
writeStream: passT,
promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
};
};
const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
fsExtra.createReadStream(filePath).pipe(writeStream); // NOTE: Addition You can compress to zip by .pipe(zlib.createGzip()).pipe(writeStream)
let output = true;
await promise.catch((reason)=> { output = false; console.log(reason);});
return output;
}
在某处调用此方法,例如:
let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
对于那些抱怨当他们使用 s3 api 上传功能并且零字节文件最终出现在 s3 上的人(@Radar155 和 @gabo)- 我也遇到了这个问题。
创建第二个 PassThrough 流,并将所有数据从第一个流传输到第二个流,并将对第二个流的引用传递给 s3。您可以通过几种不同的方式执行此操作 - 可能一种肮脏的方式是在第一个流上侦听 "data" 事件,然后将相同的数据写入第二个流 - "end" 也是如此事件 - 只需在第二个流上调用结束函数。我不知道这是否是 aws api、节点版本或其他问题中的错误 - 但它解决了我的问题。
它可能是这样的:
var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();
var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
destStream.write(chunk);
});
srcStream.on('end', function () {
dataStream.end();
});
我正在使用 KnexJS,但在使用他们的流媒体时遇到了问题 API。我终于修好了,希望以下内容对某人有所帮助。
const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();
knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());
const uploadResult = await s3
.upload({
Bucket: 'my-bucket',
Key: 'stream-test.txt',
Body: passThroughStream
})
.promise();
None 个答案对我有用,因为我想:
- 输入
s3.upload()
- 将
s3.upload()
的结果通过管道传输到另一个流中
接受的答案不支持后者。其他依赖于 promise api,这在使用流管道时工作起来很麻烦。
这是我对已接受答案的修改。
const s3 = new S3();
function writeToS3({Key, Bucket}) {
const Body = new stream.PassThrough();
s3.upload({
Body,
Key,
Bucket: process.env.adpBucket
})
.on('httpUploadProgress', progress => {
console.log('progress', progress);
})
.send((err, data) => {
if (err) {
Body.destroy(err);
} else {
console.log(`File uploaded and available at ${data.Location}`);
Body.destroy();
}
});
return Body;
}
const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});
pipeline.on('close', () => {
// upload finished, do something else
})
pipeline.on('error', () => {
// upload wasn't successful. Handle it
})
在上面最被接受的答案中要注意的是: 如果你使用管道,你需要 return 在函数中传递,
fs.createReadStream(<filePath>).pipe(anyUploadFunction())
function anyUploadFunction () {
let pass = new stream.PassThrough();
return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}
否则它会静静地移动到下一个而不会抛出错误,或者会抛出 TypeError: dest.on is not a function
的错误,具体取决于您编写函数的方式
按照其他答案并为 Node.js 使用最新的 AWS SDK,有一个更清晰和更简单的解决方案,因为 s3 upload() 函数接受流,使用 await 语法和 S3 的承诺:
var model = await s3Client.upload({
Bucket : bucket,
Key : key,
ContentType : yourContentType,
Body : fs.createReadStream(path-to-file)
}).promise();
如果您使用的是 AWS 节点 SDK v3,则有用于上传的专用模块 streams/blobs/buffers。
创建一个 new stream.PassThrough()
和 pipe
输入流,然后将 passthrough 实例传递给 body。
检查以下示例:
function upload(s3, inputStream) {
const pass = new PassThrough();
inputStream.pipe(pass);
return s3.upload(
{
Bucket: 'bucket name',
Key: 'unique file name',
Body: pass,
},
{
queueSize: 4, // default concurrency
},
).promise()
.then((data) => console.log(data))
.catch((error) => console.error(error));
}