节点:fs write() 不在循环内写入。为什么不?

Node: fs write() doesn't write inside loop. Why not?

我想创建一个写入流并在我的数据进入时写入它。但是,我能够创建文件但没有写入任何内容。最终,进程耗尽内存。

我发现的问题是我在循环中调用 write()。

这是一个简单的例子:

'use strict'

var fs = require('fs');
var wstream = fs.createWriteStream('myOutput.txt');

for (var i = 0; i < 10000000000; i++) {
    wstream.write(i+'\n');
}

console.log('End!')
wstream.end();

什么都没有写,连你好都没有。但为什么?如何在循环中写入文件?

问题是您从来没有给它机会耗尽缓冲区。最终这个缓冲区变满,你 运行 内存不足。

WriteStream.write returns a boolean value indicating if the data was successfully written to disk. If the data was not successfully written, you should wait for the drain event,表示缓冲区已被耗尽。

这是一种使用 write 的 return 值和 drain 事件编写代码的方法:

'use strict'

var fs = require('fs');
var wstream = fs.createWriteStream('myOutput.txt');

function writeToStream(i) {
  for (; i < 10000000000; i++) {
    if (!wstream.write(i + '\n')) {
      // Wait for it to drain then start writing data from where we left off
      wstream.once('drain', function() {
        writeToStream(i + 1);
      });
      return;
    }
  }
  console.log('End!')
  wstream.end();
}

writeToStream(0);

补充@MikeC的, here are some relevant details from the current docs (v8.4.0) for writable.write():

If false is returned, further attempts to write data to the stream should stop until the 'drain' event is emitted.

While a stream is not draining, calls to write() will buffer chunk, and return false. Once all currently buffered chunks are drained (accepted for delivery by the operating system), the 'drain' event will be emitted. It is recommended that once write() returns false, no more chunks be written until the 'drain' event is emitted. While calling write() on a stream that is not draining is allowed, Node.js will buffer all written chunks until maximum memory usage occurs, at which point it will abort unconditionally. Even before it aborts, high memory usage will cause poor garbage collector performance and high RSS (which is not typically released back to the system, even after the memory is no longer required).

backpressuring in streams:

In any scenario where the data buffer has exceeded the highWaterMark or the write queue is currently busy, .write() will return false.

When a false value is returned, the backpressure system kicks in.

Once the data buffer is emptied, a .drain() event will be emitted and resume the incoming data flow.

Once the queue is finished, backpressure will allow data to be sent again. The space in memory that was being used will free itself up and prepare for the next batch of data.

               +-------------------+         +=================+
               |  Writable Stream  +--------->  .write(chunk)  |
               +-------------------+         +=======+=========+
                                                     |
                                  +------------------v---------+
   +-> if (!chunk)                |    Is this chunk too big?  |
   |     emit .end();             |    Is the queue busy?      |
   +-> else                       +-------+----------------+---+
   |     emit .write();                   |                |
   ^                                   +--v---+        +---v---+
   ^-----------------------------------<  No  |        |  Yes  |
                                       +------+        +---v---+
                                                           |
           emit .pause();          +=================+     |
           ^-----------------------+  return false;  <-----+---+
                                   +=================+         |
                                                               |
when queue is empty     +============+                         |
^-----------------------<  Buffering |                         |
|                       |============|                         |
+> emit .drain();       |  ^Buffer^  |                         |
+> emit .resume();      +------------+                         |
                        |  ^Buffer^  |                         |
                        +------------+   add chunk to queue    |
                        |            <---^---------------------<
                        +============+

这是一些可视化效果(运行 使用 --max-old-space-size=512 的 V8 堆内存大小为 512MB 的脚本)。

此可视化显示 i 每 10,000 步的 heap memory usage(红色)和增量时间(紫色)(X 轴显示 i):

'use strict'

var fs = require('fs');
var wstream = fs.createWriteStream('myOutput.txt');
var latestTime = (new Date()).getTime();
var currentTime;

for (var i = 0; i < 10000000000; i++) {
    wstream.write(i+'\n');
    if (i % 10000 === 0) {
        currentTime = (new Date()).getTime();
        console.log([  // Output CSV data for visualisation
            i,
            (currentTime - latestTime) / 5,
            process.memoryUsage().heapUsed / (1024 * 1024)
        ].join(','));
        latestTime = currentTime;
    }
}

console.log('End!')
wstream.end();

随着内存使用量接近 512MB 的最大限制,脚本运行速度越来越慢,直到达到限制时最终崩溃。


此可视化使用 v8.setFlagsFromString() with --trace_gc 显示每个垃圾收集的当前内存使用情况(红色)和执行时间(紫色)(X 轴显示以秒为单位的总运行时间):

'use strict'

var fs = require('fs');
var v8 = require('v8');
var wstream = fs.createWriteStream('myOutput.txt');

v8.setFlagsFromString('--trace_gc');

for (var i = 0; i < 10000000000; i++) {
    wstream.write(i+'\n');
}

console.log('End!')
wstream.end();

大约 4 秒后内存使用率达到 80%,垃圾收集器gives up trying to Scavenge and is forced to use Mark-sweep (more than 10 times slower) – see this article了解更多详情。


为了比较,这里是@MikeC 代码的相同可视化效果,它在 write 缓冲区变满时等待 drain

为了补充(甚至更多)@Mike Cluck 的 ,我使用节点流 pipe() 实现了具有相同行为的解决方案。也许它会对某人有用。 根据 docs(节点 11.13.0):

The readable.pipe() method attaches a Writable stream to the readable, causing it to switch automatically into flowing mode and push all of its data to the attached Writable. The flow of data will be automatically managed so that the destination Writable stream is not overwhelmed by a faster Readable stream.

因此,pipe() 提供开箱即用的背压策略。所需要的只是以某种方式创建可读流。在我的示例中,我从节点流模块扩展 Readable class 以创建简单的计数器:

const { Readable } = require('stream');
const fs = require('fs');
const writeStream = fs.createWriteStream('./bigFile.txt');

class Counter extends Readable {
    constructor(opt) {
        super(opt);
        this._max = 1e7;
        this._index = 1;
    }

    _read() {
        const i = this._index++;
        if (i > this._max)
            this.push(null);
        else {
            this.push(i + '\n');
        }
    }
}

new Counter().pipe(writeStream); 

行为完全相同 - 数据不断以小块的形式推送到文件,并且内存消耗是恒定的(在我的机器上约为 50MB)。

pipe() 的优点在于,如果您提供了可读流(来自请求,即),您需要做的就是使用:readable.pipe(writable)