_read() 未在可读流上实现

_read() is not implemented on Readable stream

本题是如何真正实现一个可读流的read方法。

我有一个可读流的实现:

import {Readable} from "stream";
this.readableStream = new Readable();

我遇到了这个错误

events.js:136 throw er; // Unhandled 'error' event ^

Error [ERR_STREAM_READ_NOT_IMPLEMENTED]: _read() is not implemented at Readable._read (_stream_readable.js:554:22) at Readable.read (_stream_readable.js:445:10) at resume_ (_stream_readable.js:825:12) at _combinedTickCallback (internal/process/next_tick.js:138:11) at process._tickCallback (internal/process/next_tick.js:180:9) at Function.Module.runMain (module.js:684:11) at startup (bootstrap_node.js:191:16) at bootstrap_node.js:613:3

错误发生的原因很明显,我们需要这样做:

  this.readableStream = new Readable({
      read(size) {
        return true;
      }
    });

虽然我不太明白如何实现读取方法。

唯一有用的就是调用

this.readableStream.push('some string or buffer');

如果我尝试这样做:

   this.readableStream = new Readable({
          read(size) {
            this.push('foo');   // call push here!
            return true;
          }
     });

然后什么也没有发生 - 可读性中没有任何结果!

此外,这些文章说您不需要实现 read 方法:

https://github.com/substack/stream-handbook#creating-a-readable-stream

https://medium.freecodecamp.org/node-js-streams-everything-you-need-to-know-c9141306be93

我的问题是 - 为什么在 read 方法中调用 push 什么都不做?唯一对我有用的就是在别处调用 readable.push()。

why does calling push inside the read method do nothing? The only thing that works for me is just calling readable.push() elsewhere.

我认为这是因为您没有使用它,您需要将其通过管道传输到可写流(例如 stdout)或者只是通过 data 事件使用它:

const { Readable } = require("stream");

let count = 0;
const readableStream = new Readable({
    read(size) {
        this.push('foo');
        if (count === 5) this.push(null);
        count++;
    }
});

// piping
readableStream.pipe(process.stdout)

// through the data event
readableStream.on('data', (chunk) => {
  console.log(chunk.toString());
});

它们都应该打印 5 次 foo(虽然它们略有不同)。您应该使用哪一个取决于您要完成的任务。

Furthermore, these articles says you don't need to implement the read method:

您可能不需要它,这应该有效:

const { Readable } = require("stream");

const readableStream = new Readable();

for (let i = 0; i <= 5; i++) {
    readableStream.push('foo');
}
readableStream.push(null);

readableStream.pipe(process.stdout)

在这种情况下,您无法通过data 事件捕获它。此外,这种方式不是很有用而且效率不高我会说,我们只是一次将所有数据推送到流中(如果它很大,所有内容都会在内存中),然后使用它。

在您的 ReadableStream 初始化后实现 _read 方法:

import {Readable} from "stream";
this.readableStream = new Readable();
this.readableStream.read = function () {};

来自文档:

readable._read:

"当调用 readable._read() 时,如果资源中有可用数据,则实施应开始使用 this.push(dataChunk) 方法将该数据推送到读取队列中。link"

readable.push:

"readable.push() 方法只能由 Readable 实现者调用,并且只能从 readable._read() 方法中调用。link"

readableStream 就像一个池:

  • .push(data),就像抽水到池子里一样。
  • .pipe(destination), 就好比将水池连接到管道上,然后将水抽到其他地方
  • _read(size) 运行 作为泵并控制多少水流和数据何时结束。

fs.createReadStream() 将创建读取流,其中 _read() 函数已自动实现以推送文件数据并在文件结束时结束。

当池连接到管道时,_read(size) 会自动触发。因此,如果您在没有连接到目的地的方式的情况下强行调用此函数,它将泵送到“哪里”?它会影响 _read() 中的机器状态(可能是光标移动到错误的位置,...)

read() 函数必须在 new Stream.Readable() 中创建。它实际上是一个对象内部的函数。它不是 readableStream.read(),实现 readableStream.read=function(size){...} 将不起作用。

理解实现的简单方法:

var Reader=new Object();
Reader.read=function(size){
    if (this.i==null){this.i=1;}else{this.i++;}
    this.push("abc");
    if (this.i>7){ this.push(null); }
}

const Stream = require('stream');
const renderStream = new Stream.Readable(Reader);

renderStream.pipe(process.stdout)

您可以使用它来将要流式传输的数据 POST 重新发送到其他服务器。 POST 使用 Axios 流式传输数据:

require('axios')({
    method: 'POST',
    url: 'http://127.0.0.1:3000',
    headers: {'Content-Length': 1000000000000},
    data: renderStream
});