为什么这个 readline 异步迭代器不能正常工作?

Why does this readline async iterator not work properly?

这是我在节点 v14.4.0 中提炼成最小的、可重现的示例的更大过程的一部分。在此代码中,它从 for 循环内部不输出任何内容。

我在控制台中只看到这个输出:

before for() loop
finished
finally
done

for await (const line1 of rl1) 循环永远不会进入 for 循环——它只是直接跳过它:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

但是,如果我删除任何一个 await once(stream, 'open') 语句,那么 for 循环将完全按照预期执行(列出 rl1 文件的所有行)。因此,显然,异步迭代器与流之间的 readline 接口存在一些计时问题。任何想法可能会发生什么。知道是什么导致了这个问题或如何解决它吗?

仅供参考,await once(stream, 'open') 存在是因为异步迭代器中的另一个错误,如果打开文件时出现问题,它不会拒绝,而 await once(stream, 'open') 会导致您正确地获得拒绝,如果无法打开文件(本质上是预先打开)。

如果您想知道为什么会有 stream2 代码,它在较大的项目中使用,但我已将此示例缩减为最小的、可重现的示例,并且只需要这么多代码来演示问题。


编辑: 在尝试稍微不同的实现时,我发现如果我将两个 once(stream, "open") 调用组合在一个 Promise.all() 中,它就会起作用.所以,这有效:

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const stream2 = fs.createReadStream(file2);
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
        // pre-flight file open to catch any open errors here
        // because of existing bug in async iterator with file open errors
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

这显然不应该对您等待文件打开的确切方式敏感。某处存在一些计时错误。我想在 readline 或 readStream 上找到该错误并将其归档。有什么想法吗?

如果您在构造 readline 接口后立即创建异步迭代器,则可以使它按预期工作。如果您等待创建异步迭代器,您可能会丢失一些行,因为行事件不会被 readline 接口缓冲,但由于异步迭代器,它们将被缓冲。

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');

async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        await once(stream1, 'open');
        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});

        const rl1Iterator = rl1[Symbol.asyncIterator]();

        const stream2 = fs.createReadStream(file2);
        await once(stream2, 'open');
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1Iterator) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("stream.txt", "stream.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
})

根据评论中的讨论,这可能仍然不是一个理想的解决方案,因为 readline 模块还有其他各种问题,但我想我会添加一个答案来解决原始问题中指出的问题。

事实证明,潜在的问题是 readline.createInterface() 立即调用它将添加一个 data 事件侦听器 (code reference here) 并恢复流以开始流。

input.on('data', ondata);

input.resume();

然后,在 ondata 侦听器中,它解析行数据,并在找到行时触发 line 事件 here.

for (let n = 0; n < lines.length; n++)
  this._onLine(lines[n]);

但是,在我的示例中,在调用 readline.createInterface() 和创建异步迭代器(将侦听 line 事件)之间发生了其他异步事件。因此,line 事件正在发出,但还没有任何东西在监听它们。

因此,要正常工作 readline.createInterface() 需要在调用 readline.createInterface() 之后同步添加要监听 line 事件的任何内容,否则存在竞争条件并且 line 事件可能会丢失。


在我的原始代码示例中,一种可靠的变通方法是在我完成 await once(...) 之前不要调用 readline.createInterface()。然后,异步迭代器将在 readline.createInterface() 被调用后同步创建。

const fs = require('fs');
const readline = require('readline');
const { once } = require('events');


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1);
        const stream2 = fs.createReadStream(file2);
        // wait for both files to be open to catch any "open" errors here
        // since readline has bugs about not properly reporting file open errors
        // this await must be done before either call to readline.createInterface()
        // to avoid race conditions that can lead to lost lines of data
        await Promise.all([once(stream1, "open"), once(stream2, "open")]);

        const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
        const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

test("data/numbers.txt", "data/letters.txt").then(() => {
    console.log(`done`);
}).catch(err => {
    console.log('Got rejected promise:', err);
});

解决此一般问题的一种方法是更改​​ readline.createInterface(),使其不添加 data 事件并恢复流,直到有人添加 line 事件侦听器。这将防止数据丢失。它将允许 readline 接口对象静静地坐在那里而不会丢失数据,直到其输出的接收者真正准备好为止。这将适用于异步迭代器,并且还可以防止混合了其他异步代码的接口的其他使用可能丢失 line 个事件。

关于此的注释已添加到相关的 open readline 错误问题 here

readline 模块也可以使用更现代的流 API 替换为简单的 Transform 流。现代流 API 支持开箱即用的异步迭代器以及背压(例如,流的写入端(文件读取)将暂停,直到流的读取端(行读取)被消耗)。

const fs = require('fs');
const { Transform } = require('stream');

function toLines() {
    let remaining = '';
    return new Transform({
        writableObjectMode: false,
        readableObjectMode: true,
        transform(chunk, encoding, callback) {
            try {
                const lines = (remaining + chunk).split(/\r?\n/g);
                remaining = lines.pop();
                for (const line of lines) {
                    this.push(line);
                }
                callback();
            } catch (err) {
                callback(err);
            }
        },
        flush(callback) {
            if (remaining !== '') {
                this.push(remaining);
            }
            callback();
        }
    });
}


async function test(file1, file2) {
    try {
        const stream1 = fs.createReadStream(file1, { encoding: 'utf8' });
        const rl1 = stream1.pipe(toLines());

        const stream2 = fs.createReadStream(file2, { encoding: 'utf8' });
        const rl2 = stream2.pipe(toLines());

        console.log('before for() loop');
        for await (const line1 of rl1) {
            console.log(line1);
        }
        console.log('finished');
    } finally {
        console.log('finally');
    }
}

此示例不支持 readline 模块的 crlfDelay 选项,但可以修改算法以执行类似的操作。它还(据我所知)比 readline 模块支持的错误处理更好。