使用 busboy 的 Nodejs 可恢复文件上传
Nodejs Resumable file upload using busboy
我正在使用 express 和 busboy 实现上传端点。该代码就像一个魅力。我正在尝试实现可恢复上传的逻辑。例如,如果用户正在上传 10gb 或 50gb 的文件,如果他们的互联网断开连接,下次他们上传相同的文件时,应该会恢复。
我知道我需要实现另一个端点,它应该告诉客户端到目前为止已经上传了多少字节,以便客户端可以发送剩余的字节。
我不确定如何从这里开始,因为我在这里面临的第一个问题是当上传发生时,express 上传 OS 的 tmp 目录中的临时文件。是否可以上传我当前脚本目录中的临时文件?
以下是我的上传端点代码。
router.post("/upload", (req, res, next) => {
const busboy = new Busboy({ headers: req.headers });
req.pipe(busboy);
busboy.on("file", (fieldname, file, filename) => {
const filepath = path.join(__dirname, filename);
var writeStream = fs.createWriteStream(filepath);
file.pipe(writeStream);
writeStream.on("close", () => {
res.send(filename);
});
});
});
回答我自己的问题。
我已经设法破解了解决方案。基本上,您需要从上次上传文件的位置继续上传文件。我不确定这是否是最好的处理方式,但它适合我。
server.js
const express = require("express");
const busboy = require("connect-busboy");
const path = require("path");
const fs = require("fs");
const cors = require("cors");
const app = express(); // Initialize the express web server
app.use(cors());
app.use(
busboy({
highWaterMark: 2 * 1024 * 1024, // Set 2MiB buffer
})
); // Insert the busboy middle-ware
app.use(busboy()); // Insert the busboy middle-ware
const uploadPath = path.join(__dirname, "upload_data");
const database = {};
// This endpoint tells the client how much bytes have already been uploaded so far, otherwise sends 0 byte (meaning file is new)
app.route("/:id").get((req, res, next) => {
const fileId = req.params.id;
let bytes = 0;
const dbFileName = database[fileId];
if (dbFileName) {
try {
const completeFilePath = path.join(uploadPath, dbFileName);
const fd = fs.openSync(completeFilePath, "r");
const fileStat = fs.fstatSync(fd);
bytes = fileStat.size;
return res.json({ bytes: bytes });
} catch (error) {
console.error(error);
return res.json({ bytes: bytes });
}
}
return res.json({ bytes: bytes });
});
// Handle the upload post request
app.route("/upload").post((req, res, next) => {
const xFileId = req.headers["x-file-id"];
const xStartByte = parseInt(req.headers["x-start-byte"], 10);
const xFileSize = parseInt(req.headers["x-file-size"], 10);
if (xStartByte >= xFileSize) {
return res.json("File already uploaded");
}
req.pipe(req.busboy); // Pipe it trough busboy
req.on("data", (data) => {
// console.log(">", data.length);
});
req.busboy.on("file", (fieldname, file, filename) => {
if (database[xFileId]) {
filename = database[xFileId];
} else {
database[xFileId] = filename;
}
const completeFilePath = path.join(uploadPath, filename);
console.log(`Upload of '${filename}' started`);
// Create a write stream of the new file
let fstream;
if (xStartByte) {
console.log("APPEND Mode");
fstream = fs.createWriteStream(completeFilePath, {
flags: "a",
});
} else {
console.log("WRITE Mode");
fstream = fs.createWriteStream(completeFilePath, {
flags: "w",
});
}
// Pipe it trough
file.pipe(fstream);
file.on("error", (e) => console.log("file.on.error", e));
file.on("limit", (e) => console.log("Limit reached", e));
fstream.on("error", function (err) {
console.log("fileStream error>>>>>", err);
});
// On finish of the upload
fstream.on("close", () => {
console.log(`Upload of '${filename}' finished`);
// res.json('done');
});
});
req.busboy.on("finish", function (a) {
return res.json("ok");
});
req.busboy.on("error", (err) => {
console.log(`Busboy error`, err);
});
});
app.listen(6969, () => console.log("listing on 6969"));
client.js
var request = require("request");
var fs = require("fs");
var path = require("path");
let filebasename = "35gb.zip";
const filePath = path.join(__dirname, filebasename);
// Get the information about the file, like filesize and unique id of the file
function getFileInfo() {
try {
const fd = fs.openSync(filePath, "r");
const fileStat = fs.fstatSync(fd);
return {
fileId: `${filebasename}-${fileStat.size}-${fileStat.mtimeMs}`,
size: fileStat.size,
};
} catch (error) {
console.error(error);
}
}
const { fileId, size } = getFileInfo();
// Send api request to server asking how much bytes have already been uploaded (if any)
function info() {
let url = `http://localhost:6969/${fileId}`;
const options = {
method: "GET",
url,
timeout: 200000,
headers: {
"Content-Type": "application/json",
},
};
return new Promise((resolve, reject) => {
request(options, function (err, res, body) {
if (err) {
console.log(err);
return reject(err);
}
const { bytes } = JSON.parse(body);
resolve(bytes);
});
});
}
// Send upload request
async function upload() {
const bytesAlreadyUploaded = await info();
let url = "http://localhost:6969/upload";
const uploadStream = fs.createReadStream(filePath, {
start: bytesAlreadyUploaded, // this will be 0 incase file is new
highWaterMark: 2 * 1024 * 1024,
});
const options = {
method: "POST",
url,
timeout: 200000,
headers: {
"Content-Type": "multipart/form-data",
"x-file-id": fileId,
"x-start-byte": bytesAlreadyUploaded,
"x-file-size": size,
"x-file-name": filebasename,
},
formData: {
image: uploadStream,
},
};
request(options, async (err, res, body) => {
if (err) {
// Basically if an error occurs, EPIPE or Connection timed out or any other error, we will resume uploading from the point where it was last uploaded
console.log(`Error ${err.code}. Resuming upload...`);
await upload();
return;
}
console.log("body", typeof body, body);
});
}
(async () => {
upload();
})();
我正在使用 express 和 busboy 实现上传端点。该代码就像一个魅力。我正在尝试实现可恢复上传的逻辑。例如,如果用户正在上传 10gb 或 50gb 的文件,如果他们的互联网断开连接,下次他们上传相同的文件时,应该会恢复。
我知道我需要实现另一个端点,它应该告诉客户端到目前为止已经上传了多少字节,以便客户端可以发送剩余的字节。
我不确定如何从这里开始,因为我在这里面临的第一个问题是当上传发生时,express 上传 OS 的 tmp 目录中的临时文件。是否可以上传我当前脚本目录中的临时文件?
以下是我的上传端点代码。
router.post("/upload", (req, res, next) => {
const busboy = new Busboy({ headers: req.headers });
req.pipe(busboy);
busboy.on("file", (fieldname, file, filename) => {
const filepath = path.join(__dirname, filename);
var writeStream = fs.createWriteStream(filepath);
file.pipe(writeStream);
writeStream.on("close", () => {
res.send(filename);
});
});
});
回答我自己的问题。
我已经设法破解了解决方案。基本上,您需要从上次上传文件的位置继续上传文件。我不确定这是否是最好的处理方式,但它适合我。
server.js
const express = require("express");
const busboy = require("connect-busboy");
const path = require("path");
const fs = require("fs");
const cors = require("cors");
const app = express(); // Initialize the express web server
app.use(cors());
app.use(
busboy({
highWaterMark: 2 * 1024 * 1024, // Set 2MiB buffer
})
); // Insert the busboy middle-ware
app.use(busboy()); // Insert the busboy middle-ware
const uploadPath = path.join(__dirname, "upload_data");
const database = {};
// This endpoint tells the client how much bytes have already been uploaded so far, otherwise sends 0 byte (meaning file is new)
app.route("/:id").get((req, res, next) => {
const fileId = req.params.id;
let bytes = 0;
const dbFileName = database[fileId];
if (dbFileName) {
try {
const completeFilePath = path.join(uploadPath, dbFileName);
const fd = fs.openSync(completeFilePath, "r");
const fileStat = fs.fstatSync(fd);
bytes = fileStat.size;
return res.json({ bytes: bytes });
} catch (error) {
console.error(error);
return res.json({ bytes: bytes });
}
}
return res.json({ bytes: bytes });
});
// Handle the upload post request
app.route("/upload").post((req, res, next) => {
const xFileId = req.headers["x-file-id"];
const xStartByte = parseInt(req.headers["x-start-byte"], 10);
const xFileSize = parseInt(req.headers["x-file-size"], 10);
if (xStartByte >= xFileSize) {
return res.json("File already uploaded");
}
req.pipe(req.busboy); // Pipe it trough busboy
req.on("data", (data) => {
// console.log(">", data.length);
});
req.busboy.on("file", (fieldname, file, filename) => {
if (database[xFileId]) {
filename = database[xFileId];
} else {
database[xFileId] = filename;
}
const completeFilePath = path.join(uploadPath, filename);
console.log(`Upload of '${filename}' started`);
// Create a write stream of the new file
let fstream;
if (xStartByte) {
console.log("APPEND Mode");
fstream = fs.createWriteStream(completeFilePath, {
flags: "a",
});
} else {
console.log("WRITE Mode");
fstream = fs.createWriteStream(completeFilePath, {
flags: "w",
});
}
// Pipe it trough
file.pipe(fstream);
file.on("error", (e) => console.log("file.on.error", e));
file.on("limit", (e) => console.log("Limit reached", e));
fstream.on("error", function (err) {
console.log("fileStream error>>>>>", err);
});
// On finish of the upload
fstream.on("close", () => {
console.log(`Upload of '${filename}' finished`);
// res.json('done');
});
});
req.busboy.on("finish", function (a) {
return res.json("ok");
});
req.busboy.on("error", (err) => {
console.log(`Busboy error`, err);
});
});
app.listen(6969, () => console.log("listing on 6969"));
client.js
var request = require("request");
var fs = require("fs");
var path = require("path");
let filebasename = "35gb.zip";
const filePath = path.join(__dirname, filebasename);
// Get the information about the file, like filesize and unique id of the file
function getFileInfo() {
try {
const fd = fs.openSync(filePath, "r");
const fileStat = fs.fstatSync(fd);
return {
fileId: `${filebasename}-${fileStat.size}-${fileStat.mtimeMs}`,
size: fileStat.size,
};
} catch (error) {
console.error(error);
}
}
const { fileId, size } = getFileInfo();
// Send api request to server asking how much bytes have already been uploaded (if any)
function info() {
let url = `http://localhost:6969/${fileId}`;
const options = {
method: "GET",
url,
timeout: 200000,
headers: {
"Content-Type": "application/json",
},
};
return new Promise((resolve, reject) => {
request(options, function (err, res, body) {
if (err) {
console.log(err);
return reject(err);
}
const { bytes } = JSON.parse(body);
resolve(bytes);
});
});
}
// Send upload request
async function upload() {
const bytesAlreadyUploaded = await info();
let url = "http://localhost:6969/upload";
const uploadStream = fs.createReadStream(filePath, {
start: bytesAlreadyUploaded, // this will be 0 incase file is new
highWaterMark: 2 * 1024 * 1024,
});
const options = {
method: "POST",
url,
timeout: 200000,
headers: {
"Content-Type": "multipart/form-data",
"x-file-id": fileId,
"x-start-byte": bytesAlreadyUploaded,
"x-file-size": size,
"x-file-name": filebasename,
},
formData: {
image: uploadStream,
},
};
request(options, async (err, res, body) => {
if (err) {
// Basically if an error occurs, EPIPE or Connection timed out or any other error, we will resume uploading from the point where it was last uploaded
console.log(`Error ${err.code}. Resuming upload...`);
await upload();
return;
}
console.log("body", typeof body, body);
});
}
(async () => {
upload();
})();