使用流 PassThrough 通过 Node/Express 将 AWS SDK 文件上传到 S3 - 文件总是损坏
AWS SDK file upload to S3 via Node/Express using stream PassThrough - file is always corrupt
这很简单。使用此代码,任何上传的图像文件都已损坏且无法打开。 PDF 看起来不错,但我注意到它正在向 text-based 文件中注入值。这是 s3 中的正确文件大小,而不是像出现问题一样为零。我不确定这是 Express、SDK 还是两者结合的问题?是邮递员吗?今年 3 月,我在一个工作项目中构建了类似的东西,它运行得非常完美。我无法再访问该代码进行比较。
没有错误,没有任何问题的迹象。
const aws = require("aws-sdk");
const stream = require("stream");
const express = require("express");
const router = express.Router();
const AWS_ACCESS_KEY_ID = "XXXXXXXXXXXXXXXXXXXX";
const AWS_SECRET_ACCESS_KEY = "superSecretAccessKey";
const BUCKET_NAME = "my-bucket";
const BUCKET_REGION = "us-east-1";
const s3 = new aws.S3({
region: BUCKET_REGION,
accessKeyId: AWS_ACCESS_KEY_ID,
secretAccessKey: AWS_SECRET_ACCESS_KEY
});
const uploadStream = key => {
let streamPass = new stream.PassThrough();
let params = {
Bucket: BUCKET_NAME,
Key: key,
Body: streamPass
};
let streamPromise = s3.upload(params, (err, data) => {
if (err) {
console.error("ERROR: uploadStream:", err);
} else {
console.log("INFO: uploadStream:", data);
}
}).promise();
return {
streamPass: streamPass,
streamPromise: streamPromise
};
};
router.post("/upload", async (req, res) => {
try {
let key = req.query.file_name;
let { streamPass, streamPromise } = uploadStream(key);
req.pipe(streamPass);
await streamPromise;
res.status(200).send({ result: "Success!" });
} catch (e) {
res.status(500).send({ result: "Fail!" });
}
});
module.exports = router;
这是我的 package.json:
{
"name": "expresss3streampass",
"version": "0.0.0",
"private": true,
"scripts": {
"start": "node ./bin/www"
},
"dependencies": {
"aws-sdk": "^2.812.0",
"cookie-parser": "~1.4.4",
"debug": "~2.6.9",
"express": "~4.16.1",
"morgan": "~1.9.1"
}
}
更新:
进一步测试后,我注意到 plain-text 文件正在被 Postman 更改。例如这个源文件:
{
"question_id": null,
"position_type_id": 1,
"question_category_id": 1,
"position_level_id": 1,
"question": "Do you test your code before calling it \"done\"?",
"answer": "Candidate should respond that they at least happy path test every feature and bug fix they write.",
"active": 1
}
...落入桶中后看起来像这样:
----------------------------472518836063077482836177
Content-Disposition: form-data; name="file"; filename="question.json"
Content-Type: application/json
{
"question_id": null,
"position_type_id": 1,
"question_category_id": 1,
"position_level_id": 1,
"question": "Do you test your code before calling it \"done\"?",
"answer": "Candidate should respond that they at least happy path test every feature and bug fix they write.",
"active": 1
}
----------------------------472518836063077482836177--
我不得不认为这是问题所在。 Postman 是这个等式中唯一改变的东西,从这段代码第一次为我工作时起。我的请求 header 看起来像这样:
我是最初添加“application/x-www-form-urlencoded”header的人。如果我现在使用它,我最终会在存储桶中得到一个 0 字节的文件。
据我所知,Postman 的行为符合预期——“文本注入”实际上是一种网络标准,用于 identify/demarcate 文件上传。请参阅此 MDN Web Doc as well as this one 了解原因。
无论文件类型如何,它实际上都在注入该部分:
let streamPass = new stream.PassThrough();
// adding this
const chunks = [];
streamPass.on('data', (chunk) => chunks.push(chunk) );
streamPass.on("end", () => {
body = Buffer.concat(chunks).toString();
console.log(chunks, chunks.length)
console.log("finished", body); // <-- see it here
});
我尝试了几种方法来 control/change 这个,但在一个简单的方法上没有运气 — 从 Postman 的角度来看,我不认为这是 can 被更改,从 NodeJS 端...我的意思是它 可能 ,但解决方案很可能是 clunky/complicated,我怀疑你不想要。 (虽然我可能是错的......)
鉴于上述情况,我将与@relief.melone 一起推荐 multer
作为一个简单的解决方案。
如果您想将 multer
与 streams
一起使用,试试这个:(我已经指出了我对您的代码进行更改的位置):
// const uploadStream = (key) => {
const uploadStream = (key, mime_type) => { // <- adding the mimetype
let streamPass = new stream.PassThrough();
let params = {
Bucket: BUCKET_NAME,
Key: key,
Body: streamPass,
ACL: 'public-read', // <- you can remove this
ContentType: mime_type // <- adding the mimetype
};
let streamPromise = s3.upload(params, (err, data) => {
if (err) {
console.error("ERROR: uploadStream:", err);
} else {
console.log("INFO: uploadStream:", data);
}
}).promise();
return {
streamPass: streamPass,
streamPromise: streamPromise
};
};
// router.post("/upload", async (req, res) => {
router.post("/upload", multer().single('file'), async (req, res) => { // <- we're adding multer
try {
let key = req.query.file_name;
// === change starts here
// console.log(req.file); // <- if you want to see, uncomment this file
let { streamPass, streamPromise } = uploadStream(key, req.file.mimetype); // adding the mimetype
var bufferStream = new stream.PassThrough();
bufferStream.end(req.file.buffer);
bufferStream.pipe(streamPass); // no longer req.pipe(streamPass);
// === change ends here
await streamPromise;
res.status(200).send({ result: "Success!" });
} catch (e) {
console.log(e)
res.status(500).send({ result: "Fail!" });
}
});
Multer 是要走的路。
它提供了几种不同的模式,但据我所知,你必须编写一个自定义 storage handler 才能访问底层流,否则它将缓冲内存中的所有数据并完成后才回调。
如果您在路由处理程序中检查 req.file
,Multer 通常会在 buffer
字段下提供一个缓冲区,但它不再存在,因为我没有在回调中传递任何内容,所以我有理由相信这是按预期流式传输的。
下面是一个可行的解决方案。
注意:parse.single('image')
被传递到路由处理程序中。这是指我使用的多部分字段名称。
const aws = require('aws-sdk');
const stream = require('stream');
const express = require('express');
const router = express.Router();
const multer = require('multer')
const AWS_ACCESS_KEY_ID = "XXXXXXXXXXXXXXXXXXXX";
const AWS_SECRET_ACCESS_KEY = "superSecretAccessKey";
const BUCKET_NAME = "my-bucket";
const BUCKET_REGION = "us-east-1";
const s3 = new aws.S3({
region: BUCKET_REGION,
accessKeyId: AWS_ACCESS_KEY_ID,
secretAccessKey: AWS_SECRET_ACCESS_KEY
});
const uploadStream = key => {
let streamPass = new stream.PassThrough();
let params = {
Bucket: BUCKET_NAME,
Key: key,
Body: streamPass
};
let streamPromise = s3.upload(params, (err, data) => {
if (err) {
console.error('ERROR: uploadStream:', err);
} else {
console.log('INFO: uploadStream:', data);
}
}).promise();
return {
streamPass: streamPass,
streamPromise: streamPromise
};
};
class CustomStorage {
_handleFile(req, file, cb) {
let key = req.query.file_name;
let { streamPass, streamPromise } = uploadStream(key);
file.stream.pipe(streamPass)
streamPromise.then(() => cb(null, {}))
}
}
const storage = new CustomStorage();
const parse = multer({storage});
router.post('/upload', parse.single('image'), async (req, res) => {
try {
res.status(200).send({ result: 'Success!' });
} catch (e) {
console.log(e)
res.status(500).send({ result: 'Fail!' });
}
});
module.exports = router;
更新:更好的解决方案
我上面提供的基于 Multer 的解决方案有点老套。所以我深入了解了 它是如何工作的 。此解决方案仅使用 Busboy 来解析和流式传输文件。 Multer 实际上只是一个包装器,带有一些磁盘 I/O 便利功能。
const aws = require('aws-sdk');
const express = require('express');
const Busboy = require('busboy');
const router = express.Router();
const AWS_ACCESS_KEY_ID = "XXXXXXXXXXXXXXXXXXXX";
const AWS_SECRET_ACCESS_KEY = "superSecretAccessKey";
const BUCKET_NAME = "my-bucket";
const BUCKET_REGION = "us-east-1";
const s3 = new aws.S3({
region: BUCKET_REGION,
accessKeyId: AWS_ACCESS_KEY_ID,
secretAccessKey: AWS_SECRET_ACCESS_KEY
});
function multipart(request){
return new Promise(async (resolve, reject) => {
const headers = request.headers;
const busboy = new Busboy({ headers });
// you may need to add cleanup logic using 'busboy.on' events
busboy.on('error', err => reject(err))
busboy.on('file', function (fieldName, fileStream, fileName, encoding, mimeType) {
const params = {
Bucket: BUCKET_NAME,
Key: fileName,
Body: fileStream
};
s3.upload(params).promise().then(() => resolve());
})
request.pipe(busboy)
})
}
router.post('/upload', async (req, res) => {
try {
await multipart(req)
res.status(200).send({ result: 'Success!' });
} catch (e) {
console.log(e)
res.status(500).send({ result: 'Fail!' });
}
});
module.exports = router;
这很简单。使用此代码,任何上传的图像文件都已损坏且无法打开。 PDF 看起来不错,但我注意到它正在向 text-based 文件中注入值。这是 s3 中的正确文件大小,而不是像出现问题一样为零。我不确定这是 Express、SDK 还是两者结合的问题?是邮递员吗?今年 3 月,我在一个工作项目中构建了类似的东西,它运行得非常完美。我无法再访问该代码进行比较。
没有错误,没有任何问题的迹象。
const aws = require("aws-sdk");
const stream = require("stream");
const express = require("express");
const router = express.Router();
const AWS_ACCESS_KEY_ID = "XXXXXXXXXXXXXXXXXXXX";
const AWS_SECRET_ACCESS_KEY = "superSecretAccessKey";
const BUCKET_NAME = "my-bucket";
const BUCKET_REGION = "us-east-1";
const s3 = new aws.S3({
region: BUCKET_REGION,
accessKeyId: AWS_ACCESS_KEY_ID,
secretAccessKey: AWS_SECRET_ACCESS_KEY
});
const uploadStream = key => {
let streamPass = new stream.PassThrough();
let params = {
Bucket: BUCKET_NAME,
Key: key,
Body: streamPass
};
let streamPromise = s3.upload(params, (err, data) => {
if (err) {
console.error("ERROR: uploadStream:", err);
} else {
console.log("INFO: uploadStream:", data);
}
}).promise();
return {
streamPass: streamPass,
streamPromise: streamPromise
};
};
router.post("/upload", async (req, res) => {
try {
let key = req.query.file_name;
let { streamPass, streamPromise } = uploadStream(key);
req.pipe(streamPass);
await streamPromise;
res.status(200).send({ result: "Success!" });
} catch (e) {
res.status(500).send({ result: "Fail!" });
}
});
module.exports = router;
这是我的 package.json:
{
"name": "expresss3streampass",
"version": "0.0.0",
"private": true,
"scripts": {
"start": "node ./bin/www"
},
"dependencies": {
"aws-sdk": "^2.812.0",
"cookie-parser": "~1.4.4",
"debug": "~2.6.9",
"express": "~4.16.1",
"morgan": "~1.9.1"
}
}
更新:
进一步测试后,我注意到 plain-text 文件正在被 Postman 更改。例如这个源文件:
{
"question_id": null,
"position_type_id": 1,
"question_category_id": 1,
"position_level_id": 1,
"question": "Do you test your code before calling it \"done\"?",
"answer": "Candidate should respond that they at least happy path test every feature and bug fix they write.",
"active": 1
}
...落入桶中后看起来像这样:
----------------------------472518836063077482836177
Content-Disposition: form-data; name="file"; filename="question.json"
Content-Type: application/json
{
"question_id": null,
"position_type_id": 1,
"question_category_id": 1,
"position_level_id": 1,
"question": "Do you test your code before calling it \"done\"?",
"answer": "Candidate should respond that they at least happy path test every feature and bug fix they write.",
"active": 1
}
----------------------------472518836063077482836177--
我不得不认为这是问题所在。 Postman 是这个等式中唯一改变的东西,从这段代码第一次为我工作时起。我的请求 header 看起来像这样:
我是最初添加“application/x-www-form-urlencoded”header的人。如果我现在使用它,我最终会在存储桶中得到一个 0 字节的文件。
据我所知,Postman 的行为符合预期——“文本注入”实际上是一种网络标准,用于 identify/demarcate 文件上传。请参阅此 MDN Web Doc as well as this one 了解原因。
无论文件类型如何,它实际上都在注入该部分:
let streamPass = new stream.PassThrough();
// adding this
const chunks = [];
streamPass.on('data', (chunk) => chunks.push(chunk) );
streamPass.on("end", () => {
body = Buffer.concat(chunks).toString();
console.log(chunks, chunks.length)
console.log("finished", body); // <-- see it here
});
我尝试了几种方法来 control/change 这个,但在一个简单的方法上没有运气 — 从 Postman 的角度来看,我不认为这是 can 被更改,从 NodeJS 端...我的意思是它 可能 ,但解决方案很可能是 clunky/complicated,我怀疑你不想要。 (虽然我可能是错的......)
鉴于上述情况,我将与@relief.melone 一起推荐 multer
作为一个简单的解决方案。
如果您想将 multer
与 streams
一起使用,试试这个:(我已经指出了我对您的代码进行更改的位置):
// const uploadStream = (key) => {
const uploadStream = (key, mime_type) => { // <- adding the mimetype
let streamPass = new stream.PassThrough();
let params = {
Bucket: BUCKET_NAME,
Key: key,
Body: streamPass,
ACL: 'public-read', // <- you can remove this
ContentType: mime_type // <- adding the mimetype
};
let streamPromise = s3.upload(params, (err, data) => {
if (err) {
console.error("ERROR: uploadStream:", err);
} else {
console.log("INFO: uploadStream:", data);
}
}).promise();
return {
streamPass: streamPass,
streamPromise: streamPromise
};
};
// router.post("/upload", async (req, res) => {
router.post("/upload", multer().single('file'), async (req, res) => { // <- we're adding multer
try {
let key = req.query.file_name;
// === change starts here
// console.log(req.file); // <- if you want to see, uncomment this file
let { streamPass, streamPromise } = uploadStream(key, req.file.mimetype); // adding the mimetype
var bufferStream = new stream.PassThrough();
bufferStream.end(req.file.buffer);
bufferStream.pipe(streamPass); // no longer req.pipe(streamPass);
// === change ends here
await streamPromise;
res.status(200).send({ result: "Success!" });
} catch (e) {
console.log(e)
res.status(500).send({ result: "Fail!" });
}
});
Multer 是要走的路。
它提供了几种不同的模式,但据我所知,你必须编写一个自定义 storage handler 才能访问底层流,否则它将缓冲内存中的所有数据并完成后才回调。
如果您在路由处理程序中检查 req.file
,Multer 通常会在 buffer
字段下提供一个缓冲区,但它不再存在,因为我没有在回调中传递任何内容,所以我有理由相信这是按预期流式传输的。
下面是一个可行的解决方案。
注意:parse.single('image')
被传递到路由处理程序中。这是指我使用的多部分字段名称。
const aws = require('aws-sdk');
const stream = require('stream');
const express = require('express');
const router = express.Router();
const multer = require('multer')
const AWS_ACCESS_KEY_ID = "XXXXXXXXXXXXXXXXXXXX";
const AWS_SECRET_ACCESS_KEY = "superSecretAccessKey";
const BUCKET_NAME = "my-bucket";
const BUCKET_REGION = "us-east-1";
const s3 = new aws.S3({
region: BUCKET_REGION,
accessKeyId: AWS_ACCESS_KEY_ID,
secretAccessKey: AWS_SECRET_ACCESS_KEY
});
const uploadStream = key => {
let streamPass = new stream.PassThrough();
let params = {
Bucket: BUCKET_NAME,
Key: key,
Body: streamPass
};
let streamPromise = s3.upload(params, (err, data) => {
if (err) {
console.error('ERROR: uploadStream:', err);
} else {
console.log('INFO: uploadStream:', data);
}
}).promise();
return {
streamPass: streamPass,
streamPromise: streamPromise
};
};
class CustomStorage {
_handleFile(req, file, cb) {
let key = req.query.file_name;
let { streamPass, streamPromise } = uploadStream(key);
file.stream.pipe(streamPass)
streamPromise.then(() => cb(null, {}))
}
}
const storage = new CustomStorage();
const parse = multer({storage});
router.post('/upload', parse.single('image'), async (req, res) => {
try {
res.status(200).send({ result: 'Success!' });
} catch (e) {
console.log(e)
res.status(500).send({ result: 'Fail!' });
}
});
module.exports = router;
更新:更好的解决方案
我上面提供的基于 Multer 的解决方案有点老套。所以我深入了解了 它是如何工作的 。此解决方案仅使用 Busboy 来解析和流式传输文件。 Multer 实际上只是一个包装器,带有一些磁盘 I/O 便利功能。
const aws = require('aws-sdk');
const express = require('express');
const Busboy = require('busboy');
const router = express.Router();
const AWS_ACCESS_KEY_ID = "XXXXXXXXXXXXXXXXXXXX";
const AWS_SECRET_ACCESS_KEY = "superSecretAccessKey";
const BUCKET_NAME = "my-bucket";
const BUCKET_REGION = "us-east-1";
const s3 = new aws.S3({
region: BUCKET_REGION,
accessKeyId: AWS_ACCESS_KEY_ID,
secretAccessKey: AWS_SECRET_ACCESS_KEY
});
function multipart(request){
return new Promise(async (resolve, reject) => {
const headers = request.headers;
const busboy = new Busboy({ headers });
// you may need to add cleanup logic using 'busboy.on' events
busboy.on('error', err => reject(err))
busboy.on('file', function (fieldName, fileStream, fileName, encoding, mimeType) {
const params = {
Bucket: BUCKET_NAME,
Key: fileName,
Body: fileStream
};
s3.upload(params).promise().then(() => resolve());
})
request.pipe(busboy)
})
}
router.post('/upload', async (req, res) => {
try {
await multipart(req)
res.status(200).send({ result: 'Success!' });
} catch (e) {
console.log(e)
res.status(500).send({ result: 'Fail!' });
}
});
module.exports = router;