Node.js 从多个可读流读取时停止

Node.js stops when reading from multiple Readable streams

创建一个流(A),创建另一个流(B)和读取流(B)后,读取过程从流(A)停止。 我该如何解决这个问题?

Node.js v14.18.1

import * as readline from 'readline';
import { Readable } from 'stream';

async function  main() {

    const  streamA = Readable.from('a');
    const  readerA = readline.createInterface({
        input: streamA,
        crlfDelay: Infinity
    });

    var  stopCase = false;
    if (stopCase) {

        const  streamB = Readable.from('b');
        const  readerB = readline.createInterface({
            input: streamB,
            crlfDelay: Infinity
        });

        console.log('readB');
        for await (const line of readerB) {
            console.log(line);
        }
    }
    console.log(`readerA.closed = ${'closed' in readerA}`);

    console.log('readA');
    for await (const line of readerA) {
        console.log(line);
    }
    console.log('success');
}
main();

输出(stopCase=true):

readB
b
readerA.closed = true
readA

输出(stopCase=false):

readerA.closed = false
readA
a
success

问题是一旦您这样做:

const readerA = readline.createInterface({
    input: streamA,
    crlfDelay: Infinity
});

然后,streamA 现在已准备好流动,并且 readerA 已准备好在您进入事件循环后立即生成事件。当您进入 stopCase 块并点击 for await (const line of readerB) 时,这将允许 streamA 流动,从而允许 readerA 触发事件。

但是,当 readerA 事件触发时,您并没有监听它们,因此它完成了您未监听时的 streamA 内容。

如果在完成 stopCase 块之前不创建 readerA,您会发现它的效果如何更好。因为当您点击 stopCase 块内部的 await 时,streamAreaderA 还没有流动。

这就是我所说的 成长的痛苦 由于试图将承诺添加到事件驱动的流而导致的。如果您让流处于流动状态并且您打算使用 await 来读取这些事件,但是您随后 await 一些其他承诺,那么当您不这样做时,第一个流上的所有事件都会触发还在听它不知道您正在等待对它使用 await。您将其设置为流动,因此一旦解释器命中事件循环,它就会开始流动,即使您没有使用 await.

收听

我之前在自己的代码中已经 运行 解决了这个问题,解决方案是在您即将使用 await 阅读它或直到你有一个更传统的事件处理程序配置为监听任何流动的事件。基本上,您不能同时配置两个流与 for await (...) 一起使用。配置一个流,将其与您的 for await (...) 一起使用,然后再配置另一个。并且,还要注意在处理 for await (...) 循环时使用的任何其他承诺。使用该结构时有很多方法可以搞砸。

在我看来,如果将流实际置于不同的状态以与 promise 一起使用,它会更可靠地工作,因此它只会通过 promise 接口流动。那么,这种事情就不会发生了。但是,我确信该实施也存在许多挑战。

例如,如果您这样做:

import * as readline from 'readline';
import { Readable } from 'stream';

async function main() {
    var stopCase = true;
    console.log(`stopCase = ${stopCase}`);
    if (stopCase) {

        const streamB = Readable.from('b');
        const readerB = readline.createInterface({
            input: streamB,
            crlfDelay: Infinity
        });

        console.log('readB');
        for await (const line of readerB) {
            console.log(line);
        }
    }
    const streamA = Readable.from('a');
    const readerA = readline.createInterface({
        input: streamA,
        crlfDelay: Infinity
    });
    console.log(`streamA flowing = ${streamA.readableFlowing}`);
    console.log(`readerA.closed = ${!!readerA.closed}`);

    console.log('readA');
    for await (const line of readerA) {
        console.log(line);
    }
    console.log('success');
}
main();

然后,你得到所有的输出:

stopCase = true
readB
b
streamA flowing = true
readerA.closed = false
readA
a
success

你从来没有得到 console.log('success') 的原因可能是因为你点击了 for await (const line of readerA) { ...} 循环并且它在没有更多数据的承诺下停止在那里。同时,nodejs 注意到进程中没有任何东西可以创建任何未来事件,因此它退出了进程。

您可以在更简单的应用程序中看到相同的概念:

async function main() {
    await new Promise(resolve => {
        // do nothing
    });
    console.log('success');
}
main();

它等待一个永远不会完成的承诺,并且应用程序中没有创建任何事件,因此 nodejs 只是关闭并记录 success