Node.js 流如何工作?

How do Node.js Streams work?

我有一个关于 Node.js 流的问题 - 特别是它们在概念上是如何工作的。

不乏关于如何使用流的文档。但是我很难找到流在数据级别的工作方式。

我对网络通信 HTTP 的有限理解是来回发送完整的 "packages" 数据。类似于个人订购公司目录,客户端向服务器发送 GET(目录)请求,服务器以目录作为响应。浏览器接收的不是目录的一页,而是整本书。

节点流可能是多部分消息吗?

我喜欢 REST 模型——尤其是它是无状态的。浏览器和服务器之间的每一次交互都是完全独立且足够的。因此,节点流不是 RESTful 吗?一位开发人员提到了与套接字管道的相似性,后者使连接保持打开状态。回到我的目录订购示例,这会像带有行 "But wait! There's more!" 而不是完整目录的电视广告吗?

流的很大一部分是接收器 'down-stream' 向上游发送 'pause' 和 'continue' 等消息的能力。这些消息由什么组成?他们是POST吗?

最后,我对 Node 工作原理的有限视觉理解包括这个事件循环。函数可以放在与线程池不同的线程上,事件循环继续进行。但是发送数据流不应该让事件循环占用(即停止)直到流完成吗?它如何同时监视来自下游的 'pause' 请求?n 事件循环是否将流放在池中的另一个线程上,并且当它遇到 'pause' 请求时,检索相关线程并暂停它?

我已经阅读了 node.js 文档,完成了 nodeschool 教程,构建了一个 heroku 应用程序,购买了两本书(真正的,独立的,书籍,有点像之前所说的目录,可能不像节点流),询问了代码训练营的几位 "node" 讲师 - 所有人都在谈论如何使用流,但 none 谈论下面实际发生的事情。

也许您遇到了解释这些工作原理的好资源?对于非 CS 思维来说,也许是一个很好的拟人化类比?

首先要注意的是:node.js 流不限于 HTTP 请求。 HTTP 请求/网络资源只是 node.js.

中流的一个示例

流对于所有可以小块处理的东西都很有用。它们使您能够更轻松地以适合您的 RAM 的更小块来处理潜在的巨大资源。

假设您有一个文件(大小为几千兆字节)并且想要将所有小写字符转换为大写字符并将结果写入另一个文件。天真的方法将使用 fs.readFile 读取整个文件(为简洁起见省略了错误处理):

fs.readFile('my_huge_file', function (err, data) {
    var convertedData = data.toString().toUpperCase();

    fs.writeFile('my_converted_file', convertedData);
});

不幸的是,这种方法很容易使您的 RAM 不堪重负,因为在处理之前必须存储整个文件。您还会浪费宝贵的时间等待文件被读取。以较小的块处理文件是否有意义?您可以在等待硬盘提供剩余数据的同时,在获得第一个字节后立即开始处理:

var readStream = fs.createReadStream('my_huge_file');
var writeStream = fs.createWriteStream('my_converted_file');
readStream.on('data', function (chunk) {
    var convertedChunk = chunk.toString().toUpperCase();
    writeStream.write(convertedChunk);
});
readStream.on('end', function () {
    writeStream.end();
});

这种方法要好得多:

  1. 您将只处理很容易装入 RAM 的一小部分数据。
  2. 您在第一个字节到达后就开始处理,不要浪费时间什么都不做,而是等待。

打开流后 node.js 将打开文件并开始读取。一旦操作系统将一些字节传递给正在读取文件的线程,它就会被传递给您的应用程序。


回到 HTTP 流:

  1. 第一期在这里也有效。攻击者可能会向您发送大量数据以淹没您的 RAM 并关闭 (DoS) 您的服务。
  2. 然而,在这种情况下,第二个问题更为重要: 网络可能非常慢(想想智能手机),并且可能需要很长时间才能让客户端发送所有内容。通过使用流,您可以开始处理请求并缩短响应时间。

关于暂停 HTTP 流:这不是在 HTTP 级别完成的,而是更低的。如果你暂停流 node.js 将简单地停止从底层 TCP 套接字读取。 然后发生的事情取决于内核。它可能仍会缓冲传入的数据,因此一旦您完成当前工作,它就可以为您准备就绪。 It may also inform the sender at the TCP level that it should pause sending data。应用程序不需要处理那个。那是他们的业务 none。事实上,发件人应用程序可能甚至没有意识到您不再积极阅读!

因此,这基本上是关于在数据可用时立即提供数据,但又不会占用您的资源。底层的艰苦工作是由操作系统(例如 netfshttp)或您正在使用的流的作者(例如 zlib 这是Transform 流,通常固定在 fsnet) 上。

我认为你想太多了,我喜欢它。

哪些流适合

流有两个好处:

  • 当一个操作很慢时,它可以在获得部分结果时为您提供部分结果。例如读取一个文件,它很慢,因为 HDD 很慢,它可以在读取文件时为您提供部分文件。使用流,您可以使用文件的这些部分并立即开始处理它们。

  • 它们也很适合将程序连接在一起(读取函数)。就像在命令行中一样,您可以通过管道将不同的程序组合在一起以产生所需的输出。示例:cat file | grep word.

它们是如何在后台工作的...

这些操作中的大多数需要时间来处理,并且可以在获得它们时为您提供部分结果,这些操作不是由 Node.js 完成的,它们是由 V8 JS 引擎完成的,它只将这些结果交给 JS你和他们一起工作。

要了解您的 http 示例,您需要了解 http 的工作原理

可以使用不同的编码发送网页。一开始只有一种方法。请求时发送整个页面的位置。现在它有更有效的编码来做到这一点。其中之一是分块发送网页的一部分,直到发送整个页面。这很好,因为可以在收到网页时对其进行处理。想象一下网络浏览器。它可以在下载完成之前开始呈现网站。

您的 .pause 和 .continue 问题

首先,Node.js 流只能在同一个 Node.js 程序中工作。 Node.js 流无法与其他服务器甚至程序中的流交互。

这意味着在下面的示例中,Node.js 无法与网络服务器通信。它无法告诉它暂停或恢复。

Node.js <-> Network <-> Webserver

真正发生的是 Node.js 请求一个网页并开始下载它并且没有办法停止该下载。只是放下套接字。

那么,当您在 Node.js .pause 或 .continue 中进行时,究竟会发生什么?

它开始缓冲请求,直到您准备好再次使用它。但是下载一直没有停止。

事件循环

我准备了一个完整的答案来解释事件循环是如何工作的,但我认为你最好 watch this talk

下图似乎是节点流 class.

的 10.000 英尺概览/图表相当准确

代表streams3,贡献者Chris Dickinson

那么首先,什么是流? 好吧,有了流,我们可以处理意味着读取和写入 数据一段一段 而无需完成整个读取或写入操作。因此我们不必将所有数据都保存在内存中来执行这些操作。

例如,当我们使用流读取文件时,我们读取部分数据,对其进行处理,然后释放我们的内存,并重复此操作,直到处理完整个文件。或者想想 YouTube 或 Netflix,它们都被称为流媒体公司,因为它们流式传输视频使用相同的原理

因此,不是等到整个视频文件加载完毕,而是逐个或分块进行处理,这样您甚至可以在整个文件下载完成之前就开始观看。所以这里的原理不仅仅是关于Node.JS。但普遍适用于计算机科学。

因此,如您所见,这使得流成为处理大量数据(例如视频或我们从外部来源逐条接收的数据)的理想选择。此外,流式传输使数据处理在内存方面更有效率,因为不需要将所有数据都保存在内存中,而且在时间方面也是如此,因为我们可以在数据到达时开始处理, 而不是等待一切都到达。

它们在Node.JS中是如何实现的:

所以在 Node 中,有四种基本类型的流: 可读流、可写流、双工流和转换流。但是可读和可写是最重要的,可读流是我们可以从中读取和消费数据的流。 Streams在Node的核心模块中无处不在,比如http服务器收到请求进来的数据其实就是一个可读的流。因此,随请求一起发送的所有数据都是一块一块地而不是一大块。另外,文件系统的另一个例子是,我们可以使用 FS 模块的读取屏幕逐个读取文件,这实际上对于大型文本文件非常有用。

好吧,另一个需要注意的重要事项是流实际上是 EventEmitter class 的实例。这意味着所有流都可以发出和收听命名事件。在可读流的情况下,它们可以发出,我们可以监听许多不同的事件。但最重要的两个是数据结束事件当有新的数据要使用时触发数据事件,并且一旦没有更多数据要使用[=75]就会触发结束事件=].当然,我们可以相应地对这些事件做出反应。

最后,除了事件,我们还有可以在流上使用的重要函数。在可读流的情况下,最重要的是 piperead functions。超级重要的管道功能,它基本上允许我们将流连接在一起,将数据从一个流传递到另一个流,而根本不必担心事件。

接下来,可写流 是我们可以写入数据的流。所以基本上,与可读流相反。一个很好的例子是我们可以发送回客户端的 http 响应,它实际上是一个可写流。所以我们可以写入数据的流。所以当我们要发送数据时,我们必须把它写在某个地方,对吧?某处是一个可写流,这很有意义,对吧?

例如,如果我们想向客户发送一个大视频文件,我们就像 Netflix 或 YouTube 那样。现在关于事件,最重要的是 drain 和 finish 事件。最重要的功能是写入和结束功能。

关于双工流。它们只是同时可读和可写的流。这些不太常见。但无论如何,一个很好的例子是来自 net 模块的网络套接字。 Web 套接字基本上只是客户端和服务器之间的双向通信通道,一旦建立连接就保持打开状态。

最后,转换流是双工流,即既可读又可写的流,同时可以在读取或写入数据时修改或转换数据。这方面的一个很好的例子是 zlib 核心模块来压缩实际上使用转换流的数据。

*** Node 将这些 http 请求和响应实现为流,然后我们可以消费,我们可以使用每种流类型可用的事件和函数来使用它们。我们当然也可以实现我们自己的流,然后使用这些相同的事件和函数来使用它们。

现在让我们尝试一些例子:

const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) =>{
    fs.readFile('./txt/long_file.txt', (err, data)=>{
        if(err) console.log(err);
        res.end(data);
    });
});
server.listen('8000','127.0.01', ()=>{
    console.log(this);
});

假设long_file.txt文件包含1000000K行并且每行包含超过100个单词,所以这是一个包含大量数据的hug文件,现在在上面的例子中问题是使用readFile()函数节点会将整个文件加载到内存中,因为只有将整个文件加载到内存中节点才能将数据作为响应对象传输。

当文件很大,并且有大量请求到达您的服务器时,通过时间节点进程将很快 运行 耗尽资源并且您的应用程序将停止工作,一切都会崩溃。

让我们尝试使用流来找到解决方案:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) =>{
    const readable = fs.createReadStream('./txt/long_file.txt');
    readable.on('data', chunk=>{
        res.write(chunk);
    });
    readable.on('end',()=>{
        res.end();
    })
    readable.on('error', err=>{
        console.log('err');
        res.statusCode=500;
        res.end('File not found');
    });
});

server.listen('8000','127.0.01', ()=>{
    console.log(this);
});

在上面的流示例中,我们正在有效地流式传输文件,我们正在读取文件的一部分,一旦可用,我们就将其直接发送给客户端,使用的写入方法响应流。然后,当下一个图片可用时,将发送该图片,一直到读取整个文件并将其流式传输到客户端。

所以流基本上完成了从文件中读取数据,将发出结束事件以表示不再向此可写流写入数据。

通过上面的实践,我们解决了前面的问题,但是,上面的例子仍然存在一个很大的问题,称为背压。

问题是我们的可读流,也就是我们用来从磁盘读取文件的流,比通过网络实际发送响应可写流的结果要快得多。这将使响应流不堪重负,无法如此快速地处理所有这些传入数据,这个问题称为背压。

解决方案是使用管道运算符,它将处理数据传入和传出的速度。

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) =>{
    const readable = fs.createReadStream('./txt/long_file.txt');
    readable.pipe(res);

});

server.listen('8000','127.0.01', ()=>{
    console.log(this);
});