为什么这个 readline 异步迭代器不能正常工作?
Why does this readline async iterator not work properly?
这是我在节点 v14.4.0 中提炼成最小的、可重现的示例的更大过程的一部分。在此代码中,它从 for
循环内部不输出任何内容。
我在控制台中只看到这个输出:
before for() loop
finished
finally
done
for await (const line1 of rl1)
循环永远不会进入 for
循环——它只是直接跳过它:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
await once(stream1, 'open');
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
await once(stream2, 'open');
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
})
但是,如果我删除任何一个 await once(stream, 'open')
语句,那么 for
循环将完全按照预期执行(列出 rl1
文件的所有行)。因此,显然,异步迭代器与流之间的 readline 接口存在一些计时问题。任何想法可能会发生什么。知道是什么导致了这个问题或如何解决它吗?
仅供参考,await once(stream, 'open')
存在是因为异步迭代器中的另一个错误,如果打开文件时出现问题,它不会拒绝,而 await once(stream, 'open')
会导致您正确地获得拒绝,如果无法打开文件(本质上是预先打开)。
如果您想知道为什么会有 stream2 代码,它在较大的项目中使用,但我已将此示例缩减为最小的、可重现的示例,并且只需要这么多代码来演示问题。
编辑: 在尝试稍微不同的实现时,我发现如果我将两个 once(stream, "open")
调用组合在一个 Promise.all()
中,它就会起作用.所以,这有效:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
// pre-flight file open to catch any open errors here
// because of existing bug in async iterator with file open errors
await Promise.all([once(stream1, "open"), once(stream2, "open")]);
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
});
这显然不应该对您等待文件打开的确切方式敏感。某处存在一些计时错误。我想在 readline 或 readStream 上找到该错误并将其归档。有什么想法吗?
如果您在构造 readline 接口后立即创建异步迭代器,则可以使它按预期工作。如果您等待创建异步迭代器,您可能会丢失一些行,因为行事件不会被 readline 接口缓冲,但由于异步迭代器,它们将被缓冲。
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
await once(stream1, 'open');
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const rl1Iterator = rl1[Symbol.asyncIterator]();
const stream2 = fs.createReadStream(file2);
await once(stream2, 'open');
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1Iterator) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("stream.txt", "stream.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
})
根据评论中的讨论,这可能仍然不是一个理想的解决方案,因为 readline 模块还有其他各种问题,但我想我会添加一个答案来解决原始问题中指出的问题。
事实证明,潜在的问题是 readline.createInterface()
立即调用它将添加一个 data
事件侦听器 (code reference here) 并恢复流以开始流。
input.on('data', ondata);
和
input.resume();
然后,在 ondata
侦听器中,它解析行数据,并在找到行时触发 line
事件 here.
for (let n = 0; n < lines.length; n++)
this._onLine(lines[n]);
但是,在我的示例中,在调用 readline.createInterface()
和创建异步迭代器(将侦听 line
事件)之间发生了其他异步事件。因此,line
事件正在发出,但还没有任何东西在监听它们。
因此,要正常工作 readline.createInterface()
需要在调用 readline.createInterface()
之后同步添加要监听 line
事件的任何内容,否则存在竞争条件并且 line
事件可能会丢失。
在我的原始代码示例中,一种可靠的变通方法是在我完成 await once(...)
之前不要调用 readline.createInterface()
。然后,异步迭代器将在 readline.createInterface()
被调用后同步创建。
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
const stream2 = fs.createReadStream(file2);
// wait for both files to be open to catch any "open" errors here
// since readline has bugs about not properly reporting file open errors
// this await must be done before either call to readline.createInterface()
// to avoid race conditions that can lead to lost lines of data
await Promise.all([once(stream1, "open"), once(stream2, "open")]);
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
});
解决此一般问题的一种方法是更改 readline.createInterface()
,使其不添加 data
事件并恢复流,直到有人添加 line
事件侦听器。这将防止数据丢失。它将允许 readline 接口对象静静地坐在那里而不会丢失数据,直到其输出的接收者真正准备好为止。这将适用于异步迭代器,并且还可以防止混合了其他异步代码的接口的其他使用可能丢失 line
个事件。
关于此的注释已添加到相关的 open readline 错误问题 here。
readline
模块也可以使用更现代的流 API 替换为简单的 Transform
流。现代流 API 支持开箱即用的异步迭代器以及背压(例如,流的写入端(文件读取)将暂停,直到流的读取端(行读取)被消耗)。
const fs = require('fs');
const { Transform } = require('stream');
function toLines() {
let remaining = '';
return new Transform({
writableObjectMode: false,
readableObjectMode: true,
transform(chunk, encoding, callback) {
try {
const lines = (remaining + chunk).split(/\r?\n/g);
remaining = lines.pop();
for (const line of lines) {
this.push(line);
}
callback();
} catch (err) {
callback(err);
}
},
flush(callback) {
if (remaining !== '') {
this.push(remaining);
}
callback();
}
});
}
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1, { encoding: 'utf8' });
const rl1 = stream1.pipe(toLines());
const stream2 = fs.createReadStream(file2, { encoding: 'utf8' });
const rl2 = stream2.pipe(toLines());
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
此示例不支持 readline
模块的 crlfDelay
选项,但可以修改算法以执行类似的操作。它还(据我所知)比 readline
模块支持的错误处理更好。
这是我在节点 v14.4.0 中提炼成最小的、可重现的示例的更大过程的一部分。在此代码中,它从 for
循环内部不输出任何内容。
我在控制台中只看到这个输出:
before for() loop
finished
finally
done
for await (const line1 of rl1)
循环永远不会进入 for
循环——它只是直接跳过它:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
await once(stream1, 'open');
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
await once(stream2, 'open');
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
})
但是,如果我删除任何一个 await once(stream, 'open')
语句,那么 for
循环将完全按照预期执行(列出 rl1
文件的所有行)。因此,显然,异步迭代器与流之间的 readline 接口存在一些计时问题。任何想法可能会发生什么。知道是什么导致了这个问题或如何解决它吗?
仅供参考,await once(stream, 'open')
存在是因为异步迭代器中的另一个错误,如果打开文件时出现问题,它不会拒绝,而 await once(stream, 'open')
会导致您正确地获得拒绝,如果无法打开文件(本质上是预先打开)。
如果您想知道为什么会有 stream2 代码,它在较大的项目中使用,但我已将此示例缩减为最小的、可重现的示例,并且只需要这么多代码来演示问题。
编辑: 在尝试稍微不同的实现时,我发现如果我将两个 once(stream, "open")
调用组合在一个 Promise.all()
中,它就会起作用.所以,这有效:
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const stream2 = fs.createReadStream(file2);
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
// pre-flight file open to catch any open errors here
// because of existing bug in async iterator with file open errors
await Promise.all([once(stream1, "open"), once(stream2, "open")]);
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
});
这显然不应该对您等待文件打开的确切方式敏感。某处存在一些计时错误。我想在 readline 或 readStream 上找到该错误并将其归档。有什么想法吗?
如果您在构造 readline 接口后立即创建异步迭代器,则可以使它按预期工作。如果您等待创建异步迭代器,您可能会丢失一些行,因为行事件不会被 readline 接口缓冲,但由于异步迭代器,它们将被缓冲。
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
await once(stream1, 'open');
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const rl1Iterator = rl1[Symbol.asyncIterator]();
const stream2 = fs.createReadStream(file2);
await once(stream2, 'open');
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1Iterator) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("stream.txt", "stream.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
})
根据评论中的讨论,这可能仍然不是一个理想的解决方案,因为 readline 模块还有其他各种问题,但我想我会添加一个答案来解决原始问题中指出的问题。
事实证明,潜在的问题是 readline.createInterface()
立即调用它将添加一个 data
事件侦听器 (code reference here) 并恢复流以开始流。
input.on('data', ondata);
和
input.resume();
然后,在 ondata
侦听器中,它解析行数据,并在找到行时触发 line
事件 here.
for (let n = 0; n < lines.length; n++)
this._onLine(lines[n]);
但是,在我的示例中,在调用 readline.createInterface()
和创建异步迭代器(将侦听 line
事件)之间发生了其他异步事件。因此,line
事件正在发出,但还没有任何东西在监听它们。
因此,要正常工作 readline.createInterface()
需要在调用 readline.createInterface()
之后同步添加要监听 line
事件的任何内容,否则存在竞争条件并且 line
事件可能会丢失。
在我的原始代码示例中,一种可靠的变通方法是在我完成 await once(...)
之前不要调用 readline.createInterface()
。然后,异步迭代器将在 readline.createInterface()
被调用后同步创建。
const fs = require('fs');
const readline = require('readline');
const { once } = require('events');
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1);
const stream2 = fs.createReadStream(file2);
// wait for both files to be open to catch any "open" errors here
// since readline has bugs about not properly reporting file open errors
// this await must be done before either call to readline.createInterface()
// to avoid race conditions that can lead to lost lines of data
await Promise.all([once(stream1, "open"), once(stream2, "open")]);
const rl1 = readline.createInterface({input: stream1, crlfDelay: Infinity});
const rl2 = readline.createInterface({input: stream2, crlfDelay: Infinity});
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
test("data/numbers.txt", "data/letters.txt").then(() => {
console.log(`done`);
}).catch(err => {
console.log('Got rejected promise:', err);
});
解决此一般问题的一种方法是更改 readline.createInterface()
,使其不添加 data
事件并恢复流,直到有人添加 line
事件侦听器。这将防止数据丢失。它将允许 readline 接口对象静静地坐在那里而不会丢失数据,直到其输出的接收者真正准备好为止。这将适用于异步迭代器,并且还可以防止混合了其他异步代码的接口的其他使用可能丢失 line
个事件。
关于此的注释已添加到相关的 open readline 错误问题 here。
readline
模块也可以使用更现代的流 API 替换为简单的 Transform
流。现代流 API 支持开箱即用的异步迭代器以及背压(例如,流的写入端(文件读取)将暂停,直到流的读取端(行读取)被消耗)。
const fs = require('fs');
const { Transform } = require('stream');
function toLines() {
let remaining = '';
return new Transform({
writableObjectMode: false,
readableObjectMode: true,
transform(chunk, encoding, callback) {
try {
const lines = (remaining + chunk).split(/\r?\n/g);
remaining = lines.pop();
for (const line of lines) {
this.push(line);
}
callback();
} catch (err) {
callback(err);
}
},
flush(callback) {
if (remaining !== '') {
this.push(remaining);
}
callback();
}
});
}
async function test(file1, file2) {
try {
const stream1 = fs.createReadStream(file1, { encoding: 'utf8' });
const rl1 = stream1.pipe(toLines());
const stream2 = fs.createReadStream(file2, { encoding: 'utf8' });
const rl2 = stream2.pipe(toLines());
console.log('before for() loop');
for await (const line1 of rl1) {
console.log(line1);
}
console.log('finished');
} finally {
console.log('finally');
}
}
此示例不支持 readline
模块的 crlfDelay
选项,但可以修改算法以执行类似的操作。它还(据我所知)比 readline
模块支持的错误处理更好。