Node.js "request" 库是否支持异步可迭代响应流?

Does the Node.js "request" library support an async-iterable response stream?

我对 Node.js 库有点陌生,我正在尝试了解如何在 HTTP 响应流上使用异步迭代。我的总体目标是读取一个大的响应流并在块到达时对其进行处理,目前通过生成器函数。我无法将整个响应存储在内存中进行处理。

我正在使用 request 库来执行 HTTP 请求,如下所示。

const request = require("request");

// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
  for await (const chunk of stream) {
    yield chunk[0];
  }
}

async function doWork() {
  var response = request.get("https://pastebin.com/raw/x4Nn0Tby");
  for await (c of getChunks(response)) {
    console.log(c);
  }
}

当我 运行 doWork() 时,我收到一条错误消息,指出 getChunks()stream 变量不可异步迭代。

类型错误:流不可异步迭代

这很令人惊讶,因为我认为所有可读流通常都是异步可迭代的,并且请求库 returns 在没有提供回调时是一个流。当我将 request.get(...) 替换为 fs.createReadStream(...) 到某个本地文件时,一切都按预期工作。

也许 request 库不支持这个。如果是这样,我需要做什么才能通过异步迭代处理 HTTP 响应流?

使用 Node.js 11.13 和 request 2.88.0.

似乎您将不得不使用其他替代方案,就像他们在 request 模块文档中提到的那样,您可以在此处找到 https://www.npmjs.com/package/request

request supports both streaming and callback interfaces natively. If you'd like 
request to return a Promise instead, you can use an alternative interface wrapper for 
request. These wrappers can be useful if you prefer to work with Promises, or if 
you'd like to use async/await in ES2017.

Several alternative interfaces are provided by the request team, including:

request-promise (uses Bluebird Promises)
request-promise-native (uses native Promises)
request-promise-any (uses any-promise Promises)`

我的回答基于以下问题:

我认为您可以创建 async await 执行此操作的自定义方法。

async function doMyWork() {
try {
 const response = await myOwnRequest(url); 
 } catch (e) {
   console.log ('the error', e);
 }  
}

function myOwnRequest(url) {
  return new Promise(function (resolve, reject) {
   const resp = request.get(url);
   if(resp) {
    resolve();
   } else {
     reject();
   }
});
}

我对 requestrequest-promise-native 库进行了更多试验,但认为在当前实施下这是不可能的。结果流似乎根本不是异步迭代的。此外,在处理流之前,正确的实现需要 await 以响应 return(如 所建议)。但是,如果您调用 await request.get(...),您将检索响应的全部内容,这对于大型响应而言是不可取的。

const r = require("request");
const rpn = require("request-promise-native");

// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
  for await (const chunk of stream) {
    yield chunk[0];
  }
}

async function doWork() {
  const url = "https://pastebin.com/raw/x4Nn0Tby";
  const response = r.get(url);         // returns a non-async-iterable object.
  const response2 = await rp.get(url); // returns the contents of url

  for await (c of getChunks(response)) {  // yields response not async-iterable error.
    console.log(c);
  }
}

我对这个问题的解决方案是用 axios 库替换 requestrequest-promise-native 的使用。这些库在功能上相似,但 axios 允许您指定请求应解析为流;正如预期的那样,流是异步可迭代的。

const axios = require("axios");

async function doWork() {
  var response = await axios.request({
    method: "GET",
    url: "https://pastebin.com/raw/x4Nn0Tby",
    responseType: "stream",
  });

  for await (c of getChunks(response.data)) {  // async-iteration over response works as expected.
    console.log(c);
  }
}

简单的回答:不,不是。您可能希望在 request 周围使用基于承诺的包装器,例如 request-promise,它也适用于 async/await.

详细信息:请注意 requestdeprecated by its creator, and hence will be discontinued. This means, that sooner or later, you will most probably need to switch to another solution, such as axios, superagent or needle,仅举几例。

当然,评估这些模块并找出最适合您需求的模块取决于您,但我个人的建议是从 axios 开始,因为我过去对此有很好的体验, 然而, YMMV.

使用上述关于 axios 0.19.0 的答案中的示例代码,axios 的流选项对我不起作用。可能是椅子和键盘之间的问题,但无论如何......这里有一个使用 request 的替代方法。

我最终将请求流式传输到异步生成器(当然中间有一个缓冲区)。 这允许 "streaming" 类型的接口,其中可以交错读取和写入数据......它不能保证低内存消耗。尽可能快地请求管道 ("pushes") 到我们的 Writable 并且我们没有办法暂停它或将其转换为 "pull" 类型的接口(据我所知)。因此,如果我们从缓冲区中读取数据的速度比写入数据的速度慢:缓冲区会变得非常大,内存使用率也会很高。

因此,如果降低内存使用率很重要,并且您从 http 源解析大文件...那么可能在 "streaming" 的同时对缓冲区大小做一些 monitoring/reporting 以查看您是否消费代码比流更快或更慢,因此您知道缓冲区是变大还是变小。当然,如果您使用非常慢的 http 服务器进行测试...那么一切都将失败。

这可以通过设置固定缓冲区大小并使 _write 阻塞直到发生更多读取(在缓冲区中腾出空间)来解决...也就是说,请求必须等待写入更多数据管道。但是,请求可能会在内部缓冲...因此,如果数据无论如何都堆积在请求的末端,这将无助于内存消耗。必须检查一下。

示例代码:

const request = require('request'),
    Writable = require('stream').Writable,
    EventEmitter = require('events');

module.exports = function (url, MAX_BYTES=1024) {
    var response = new ResponseBuffer(MAX_BYTES);

    request
        .get(url)
        .on('error', function(err) { throw err; })
        .pipe(response)
        .on('error', function(err) { throw err; });

    return response.reader();
};

class ResponseBuffer extends Writable {
    constructor (MAX_BYTES=1024) {
        super();
        this.buffer = '';
        this.open = true;
        this.done = null;  // callback to call when done reading.
        this.MAX_BYTES = MAX_BYTES;
        this.events = new EventEmitter();
    }
    _write(chunk, enc, next) {
        this.buffer += chunk;
        this.events.emit('data');
        next();
    }
    _final(done) {
        this.open = false; // signal to reader to return after buffer empty.
        return done();
    }
    async * reader () {
        while (true) {
            if (this.buffer.length == 0) {
                // buffer empty and Writable !open. return.
                if (!this.open) { return; }
                else { // buffer empty.  wait for data.
                    await new Promise(resolve => this.events.once('data', resolve));
                }
            }
            let read_bytes = this.buffer.length < this.MAX_BYTES ? this.buffer.length : this.MAX_BYTES;
            yield this.buffer.slice(0, read_bytes);
            this.buffer = this.buffer.slice(read_bytes);
        }
    }
}

然后像这样使用它:


const httpModule = require('./path/to/above/module');
var httpGen = httpModule('https://www.google.com'),
    chunk;
for await (chunk of httpGen) {
    // do something with chunk.
}

另一种方法(如果您特别关心内存使用情况)是只下载到磁盘(流式传输到文件写入器)然后从磁盘增量读取(您可以异步 iter a fs.createReadStream(...)