在 Node.JS 中创建读取流

createReadStream in Node.JS

所以我使用了 fs.readFile() 它给了我

"FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory"

因为 fs.readFile() 在调用回调之前将整个文件加载到内存中,我应该改用 fs.createReadStream() 吗?

这就是我之前使用 readFile 所做的:

fs.readFile('myfile.json', function (err1, data) {
    if (err1) {
        console.error(err1);
    } else {
        var myData = JSON.parse(data);
        //Do some operation on myData here
    }
}

抱歉,我是流媒体新手;以下是使用流媒体做同样事情的正确方法吗?

var readStream = fs.createReadStream('myfile.json');

readStream.on('end', function () {  
    readStream.close();
    var myData = JSON.parse(readStream);
    //Do some operation on myData here
});

谢谢

如果文件很大,那么是的,流媒体将是您想要的处理方式。但是,您在第二个示例中所做的是让流将所有文件数据缓冲到内存中,然后在 end 上处理它。它与 readFile 本质上没有什么不同。

你会想看看 JSONStream。流式处理意味着您希望在数据流过时对其进行处理。在你的情况下,你显然 必须 这样做,因为你不能一次将整个文件缓冲到内存中。考虑到这一点,希望这样的代码有意义:

JSONStream.parse('rows.*.doc')

注意它有一种查询模式。那是因为您不会同时使用文件中的整个 JSON object/array,所以您必须更多地考虑 JSONStream 如何处理数据它找到的数据

您可以使用 JSONStream 从根本上查询您感兴趣的 JSON 数据。这样您永远不会将整个文件缓冲到内存中。它确实有一个缺点,如果你确实需要所有数据,那么你将不得不多次流式传输文件,使用 JSONStream 只提取你当时需要的数据,但在你的情况下你没有太多选择。

您还可以使用 JSONStream 按顺序解析数据,然后将其转储到数据库中。

JSONStream.parse 类似于 JSON.parse 但它 returns 不是返回整个对象,而是一个流。当解析流获得足够的数据以形成与您的查询匹配的整个对象时,它将发出一个 data 事件,其中数据是与您的查询匹配的文档。配置数据处理程序后,您可以将读取流通过管道传输到解析流中,然后观看神奇的事情发生。

示例:

var JSONStream = require('JSONStream');
var readStream = fs.createReadStream('myfile.json');
var parseStream = JSONStream.parse('rows.*.doc');
parseStream.on('data', function (doc) {
  db.insert(doc); // pseudo-code for inserting doc into a pretend database.
});
readStream.pipe(parseStream);

这是帮助您了解正在发生的事情的详细方式。这是一个更简洁的方法:

var JSONStream = require('JSONStream');
fs.createReadStream('myfile.json')
  .pipe(JSONStream.parse('rows.*.doc'))
  .on('data', function (doc) {
    db.insert(doc);
  });

编辑:

为了进一步清楚发生了什么,试着这样想。假设您有一个巨大的湖泊,您想要对水进行处理以净化它并将水转移到一个新的水库。如果你有一架巨大的魔法直升机和一个巨大的水桶,那么你就可以飞过湖面,将湖水放入水桶中,向其中添加处理化学品,然后将其飞向目的地。

问题当然是没有这样的直升机可以处理那么大的重量或体积。这根本不可能,但这并不意味着我们不能以不同的方式实现我们的目标。因此,您改为在湖泊和新水库之间建造一系列河流(溪流)。然后,您在这些河流中设置净化站,以净化流经其中的所有水。这些站点可以以多种方式运行。也许处理可以做到如此之快,以至于你可以让河流自由流动,当水以最大速度顺流而下时,净化就会发生。

也有可能是水的处理需要一定的时间,或者站点需要一定的水量才能有效处理。所以你设计你的河流有闸门,你控制水从湖流入你的河流,让水站缓冲他们需要的水,直到他们完成他们的工作并将净化后的水释放到下游并进入最终目的地目的地。

这几乎正是您想对数据执行的操作。解析流是你的清理站,它缓冲数据直到它有足够的数据来形成一个与你的查询匹配的完整文档,然后它只将该数据推送到下游(并发出 data 事件)。

节点流很好,因为大多数时候您不必处理打开和关闭门的问题。节点流足够智能,可以在流缓冲一定量的数据时控制回流。就好像净化站和湖上的闸门在互相交谈以计算出完美的流量。

如果您有流式数据库驱动程序,那么理论上您可以创建某种插入流,然后执行 parseStream.pipe(insertStream) 而不是手动处理 data 事件 :D。这是在另一个文件中创建 JSON 文件的过滤版本的示例。

fs.createReadStream('myfile.json')
  .pipe(JSONStream.parse('rows.*.doc'))
  .pipe(JSONStream.stringify())
  .pipe(fs.createWriteStream('filtered-myfile.json'));