接收大文件时,如何使用 gridfsbucket 将大文件传输到 mongodb?
How to pipe large file into mongodb using gridfsbucket, when receiving file in chunks?
我正在使用 nodejs、mongodb 和 gridfsbucket。
我正在将一个文件以 255 字节的块形式接收到我的服务器,这些文件可能非常大,因此创建一个变量来存储这些块然后使用 gridfsbucket 将其通过管道传输到 mongo 不是一个可行的选择。
目前我有一种工作方法,可以将文件临时存储在磁盘上,然后将其通过管道传输到 mongo。这实际上工作得很好,唯一的问题是我不想在使用 gridfsbucket 流入 mongo 之前临时存储数据。
有谁知道如何在这些块进入我的服务器时获取它们并立即使用 gridfsbucket 将它们流式传输到 mongo?我在想我需要打开管道,然后不断地将块流式传输到管道中,但我不确定如何完成此操作。
这是我当前存储到磁盘的代码:
fs.appendFileSync(this.tempFileName, Buffer.from(this.currfile.currdatachunk));
this.currfile.filename = this.currfile.filename.replace(")", Date.now() + ")");
var fileName = decodeURI(this.currfile.filename.replace("$(", "").replace(")", ""));
fileName = encodeURI(fileName);
var self = this;
var gb = new GridFSBucket(mongoCacheDb, { bucketName: this.cacheCollection });
var uploadStream = gb.openUploadStream(fileName);
uploadStream.options.metadata = {
'url': "/getFile/" + fileName,
'id': uploadStream.id
}
uploadStream.once("finish", function uploadStream_onceFinish() {
if (this.length > 0) {
var ms = new Message();
ms.data = self.cacheCollection + "/" + self.currfile.filename;
ms.datatype = "URL";
ms.hasdata = "yes";
ms.haserrors = "no";
ms.type = "APPXLOADURL";
sendMessage(ws, ms);
/*Send response to server indicating file receipt*/
var nameAB = Buffer.from(self.currfile.filename);
self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
self.clientsocket.write(Buffer.from(nameAB));
self.clientsocket.write(Buffer.from([3, 1]));
console.log("Finished: " + Date.now());
} else {
var nameAB = Buffer.from(self.currfile.filename);
self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
self.clientsocket.write(Buffer.from(nameAB));
self.clientsocket.write(Buffer.from([0, 0]));
}
fs.unlinkSync(self.tempFileName);
});
fs.createReadStream(this.tempFileName).pipe(uploadStream);
我想我应该再等一天。终于把我的头围起来,不再想太多了。我在第一个 运行 上创建了管道,然后在收到缓冲块时将其推送到管道中。代码如下:
if (this.chunksReceived == 0) {
this.rStream = new Readable({ read(size) { } });
this.currfile.filename = this.currfile.filename.replace(")", Date.now() + ")");
var fileName = decodeURI(this.currfile.filename.replace("$(", "").replace(")", ""));
fileName = encodeURI(fileName);
var self = this;
var gb = new GridFSBucket(mongoCacheDb, { bucketName: this.cacheCollection });
this.uploadStream = gb.openUploadStream(fileName);
this.uploadStream.options.metadata = {
'url': "/getFile/" + fileName,
'id': this.uploadStream.id
}
this.uploadStream.once("finish", function uploadStream_onceFinish() {
if (this.length > 0) {
var ms = new Message();
ms.data = self.cacheCollection + "/" + self.currfile.filename;
ms.datatype = "URL";
ms.hasdata = "yes";
ms.haserrors = "no";
ms.type = "APPXLOADURL";
sendMessage(ws, ms);
/*Send response to server indicating file receipt*/
var nameAB = Buffer.from(self.currfile.filename);
self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
self.clientsocket.write(Buffer.from(nameAB));
self.clientsocket.write(Buffer.from([3, 1]));
console.log("Finished: " + Date.now());
} else {
var nameAB = Buffer.from(self.currfile.filename);
self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
self.clientsocket.write(Buffer.from(nameAB));
self.clientsocket.write(Buffer.from([0, 0]));
}
});
this.rStream.pipe(this.uploadStream);
}
this.rStream.push(Buffer.from(this.currfile.currdatachunk));
this.chunksReceived++;
我正在使用 nodejs、mongodb 和 gridfsbucket。
我正在将一个文件以 255 字节的块形式接收到我的服务器,这些文件可能非常大,因此创建一个变量来存储这些块然后使用 gridfsbucket 将其通过管道传输到 mongo 不是一个可行的选择。
目前我有一种工作方法,可以将文件临时存储在磁盘上,然后将其通过管道传输到 mongo。这实际上工作得很好,唯一的问题是我不想在使用 gridfsbucket 流入 mongo 之前临时存储数据。
有谁知道如何在这些块进入我的服务器时获取它们并立即使用 gridfsbucket 将它们流式传输到 mongo?我在想我需要打开管道,然后不断地将块流式传输到管道中,但我不确定如何完成此操作。
这是我当前存储到磁盘的代码:
fs.appendFileSync(this.tempFileName, Buffer.from(this.currfile.currdatachunk));
this.currfile.filename = this.currfile.filename.replace(")", Date.now() + ")");
var fileName = decodeURI(this.currfile.filename.replace("$(", "").replace(")", ""));
fileName = encodeURI(fileName);
var self = this;
var gb = new GridFSBucket(mongoCacheDb, { bucketName: this.cacheCollection });
var uploadStream = gb.openUploadStream(fileName);
uploadStream.options.metadata = {
'url': "/getFile/" + fileName,
'id': uploadStream.id
}
uploadStream.once("finish", function uploadStream_onceFinish() {
if (this.length > 0) {
var ms = new Message();
ms.data = self.cacheCollection + "/" + self.currfile.filename;
ms.datatype = "URL";
ms.hasdata = "yes";
ms.haserrors = "no";
ms.type = "APPXLOADURL";
sendMessage(ws, ms);
/*Send response to server indicating file receipt*/
var nameAB = Buffer.from(self.currfile.filename);
self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
self.clientsocket.write(Buffer.from(nameAB));
self.clientsocket.write(Buffer.from([3, 1]));
console.log("Finished: " + Date.now());
} else {
var nameAB = Buffer.from(self.currfile.filename);
self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
self.clientsocket.write(Buffer.from(nameAB));
self.clientsocket.write(Buffer.from([0, 0]));
}
fs.unlinkSync(self.tempFileName);
});
fs.createReadStream(this.tempFileName).pipe(uploadStream);
我想我应该再等一天。终于把我的头围起来,不再想太多了。我在第一个 运行 上创建了管道,然后在收到缓冲块时将其推送到管道中。代码如下:
if (this.chunksReceived == 0) {
this.rStream = new Readable({ read(size) { } });
this.currfile.filename = this.currfile.filename.replace(")", Date.now() + ")");
var fileName = decodeURI(this.currfile.filename.replace("$(", "").replace(")", ""));
fileName = encodeURI(fileName);
var self = this;
var gb = new GridFSBucket(mongoCacheDb, { bucketName: this.cacheCollection });
this.uploadStream = gb.openUploadStream(fileName);
this.uploadStream.options.metadata = {
'url': "/getFile/" + fileName,
'id': this.uploadStream.id
}
this.uploadStream.once("finish", function uploadStream_onceFinish() {
if (this.length > 0) {
var ms = new Message();
ms.data = self.cacheCollection + "/" + self.currfile.filename;
ms.datatype = "URL";
ms.hasdata = "yes";
ms.haserrors = "no";
ms.type = "APPXLOADURL";
sendMessage(ws, ms);
/*Send response to server indicating file receipt*/
var nameAB = Buffer.from(self.currfile.filename);
self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
self.clientsocket.write(Buffer.from(nameAB));
self.clientsocket.write(Buffer.from([3, 1]));
console.log("Finished: " + Date.now());
} else {
var nameAB = Buffer.from(self.currfile.filename);
self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
self.clientsocket.write(Buffer.from(nameAB));
self.clientsocket.write(Buffer.from([0, 0]));
}
});
this.rStream.pipe(this.uploadStream);
}
this.rStream.push(Buffer.from(this.currfile.currdatachunk));
this.chunksReceived++;