接收大文件时,如何使用 gridfsbucket 将大文件传输到 mongodb?

How to pipe large file into mongodb using gridfsbucket, when receiving file in chunks?

我正在使用 nodejs、mongodb 和 gridfsbucket。

我正在将一个文件以 255 字节的块形式接收到我的服务器,这些文件可能非常大,因此创建一个变量来存储这些块然后使用 gridfsbucket 将其通过管道传输到 mongo 不是一个可行的选择。

目前我有一种工作方法,可以将文件临时存储在磁盘上,然后将其通过管道传输到 mongo。这实际上工作得很好,唯一的问题是我不想在使用 gridfsbucket 流入 mongo 之前临时存储数据。

有谁知道如何在这些块进入我的服务器时获取它们并立即使用 gridfsbucket 将它们流式传输到 mongo?我在想我需要打开管道,然后不断地将块流式传输到管道中,但我不确定如何完成此操作。

这是我当前存储到磁盘的代码:

fs.appendFileSync(this.tempFileName, Buffer.from(this.currfile.currdatachunk));
this.currfile.filename = this.currfile.filename.replace(")", Date.now() + ")");
var fileName = decodeURI(this.currfile.filename.replace("$(", "").replace(")", ""));
 fileName = encodeURI(fileName);
 var self = this;
 var gb = new GridFSBucket(mongoCacheDb, { bucketName: this.cacheCollection });
 var uploadStream = gb.openUploadStream(fileName);

 uploadStream.options.metadata = {
     'url': "/getFile/" + fileName,
     'id': uploadStream.id
 }

 uploadStream.once("finish", function uploadStream_onceFinish() {
                        if (this.length > 0) {
                            var ms = new Message();
                            ms.data = self.cacheCollection + "/" + self.currfile.filename;
                            ms.datatype = "URL";
                            ms.hasdata = "yes";
                            ms.haserrors = "no";
                            ms.type = "APPXLOADURL";
                            sendMessage(ws, ms);

                            /*Send response to server indicating file receipt*/
                            var nameAB = Buffer.from(self.currfile.filename);
                            self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
                            self.clientsocket.write(Buffer.from(nameAB));
                            self.clientsocket.write(Buffer.from([3, 1]));
                            console.log("Finished: " + Date.now());
                        } else {
                            var nameAB = Buffer.from(self.currfile.filename);
                            self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
                            self.clientsocket.write(Buffer.from(nameAB));
                            self.clientsocket.write(Buffer.from([0, 0]));
                        }
                        fs.unlinkSync(self.tempFileName);
                    });
                    fs.createReadStream(this.tempFileName).pipe(uploadStream);

我想我应该再等一天。终于把我的头围起来,不再想太多了。我在第一个 运行 上创建了管道,然后在收到缓冲块时将其推送到管道中。代码如下:

if (this.chunksReceived == 0) {
    this.rStream = new Readable({ read(size) { } });

    this.currfile.filename = this.currfile.filename.replace(")", Date.now() + ")");
    var fileName = decodeURI(this.currfile.filename.replace("$(", "").replace(")", ""));
    fileName = encodeURI(fileName);
    var self = this;
    var gb = new GridFSBucket(mongoCacheDb, { bucketName: this.cacheCollection });
    this.uploadStream = gb.openUploadStream(fileName);

    this.uploadStream.options.metadata = {
        'url': "/getFile/" + fileName,
        'id': this.uploadStream.id
    }

    this.uploadStream.once("finish", function uploadStream_onceFinish() {
        if (this.length > 0) {
            var ms = new Message();
            ms.data = self.cacheCollection + "/" + self.currfile.filename;
            ms.datatype = "URL";
            ms.hasdata = "yes";
            ms.haserrors = "no";
            ms.type = "APPXLOADURL";
            sendMessage(ws, ms);

            /*Send response to server indicating file receipt*/
            var nameAB = Buffer.from(self.currfile.filename);
            self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
            self.clientsocket.write(Buffer.from(nameAB));
            self.clientsocket.write(Buffer.from([3, 1]));
            console.log("Finished: " + Date.now());
        } else {
            var nameAB = Buffer.from(self.currfile.filename);
            self.clientsocket.write(Buffer.from(hton32(nameAB.length)));
            self.clientsocket.write(Buffer.from(nameAB));
            self.clientsocket.write(Buffer.from([0, 0]));
        }
    });

    this.rStream.pipe(this.uploadStream);
}
this.rStream.push(Buffer.from(this.currfile.currdatachunk));
this.chunksReceived++;