如何处理来自 fs readline.Interface 异步迭代器的错误
How to handle error from fs readline.Interface async iterator
基于 processLineByLine() 的示例,我注意到如果给定的文件名不存在,我们将无法捕获错误。在这种情况下,程序以类似以下内容结束:
UnhandledPromiseRejectionWarning: Error: ENOENT: no such file or directory
因此,我采用的引发可捕获错误的最简单方法是对 processLineByLine()
函数进行 2 次修改:
- 将其放入生成器中,例如
function*
await
文件存在检查 await access(filename, fs.constants.F_OK)
最后我不得不将 readline.Interface
实例转换为异步生成器。 我特别不喜欢这最后一部分。生成的 lines()
函数如下:
export async function* lines(filename) {
await access(filename, fs.constants.F_OK)
const lines = readline.createInterface({
input: fs.createReadStream(filename),
crlfDelay: Infinity
})
for await (const l of lines) {
yield l
}
}
问题:是否有更好的方法使 lines()
成为异步迭代器或在文件名不存在时抛出错误?
BUG 报告: 关于@jfriend00 的观察,我在 nodejs 上打开了一个 Bug 问题:https://github.com/nodejs/node/issues/30831
嗯,这是一个棘手的问题。即使在飞行前检测文件是否存在也不能保证您可以成功打开它(它可能被锁定或有权限问题)并且在打开之前检测它是否存在是服务器开发中的经典竞争条件(小 window,但仍然是竞争条件)。
我仍然认为必须有更好的方法来从 fs.createReadStream()
中获取错误,但我能找到的唯一方法是将其包装在一个承诺中,该承诺仅在文件是成功打开。这使您可以从打开文件中获取错误并将其传播回 async
函数的调用者。这是它的样子:
const fs = require('fs');
const readline = require('readline');
function createReadStreamSafe(filename, options) {
return new Promise((resolve, reject) => {
const fileStream = fs.createReadStream(filename, options);
fileStream.on('error', reject).on('open', () => {
resolve(filestream);
});
});
}
async function processLineByLine(f) {
const fileStream = await createReadStreamSafe(f);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}
processLineByLine("nofile").catch(err => {
console.log("caught error");
});
这使得 processLineByLine()
return 的承诺将被拒绝,您可以在那里处理错误,这正是我认为您所要求的。如果我误解了你的要求,请澄清。
仅供参考,在我看来,这似乎是 readline.createInterface()
中的一个错误,因为它似乎应该在 for await (const line of rl)
的第一次迭代中拒绝,但事实并非如此。
因此,即使是这种解决方法也不会在流打开后检测到读取错误。这确实需要在 createInterface()
内部修复。我同意文件打开错误或读取错误都应在 for await (const line of rl)
.
上显示为拒绝
文件打开问题的另一种解决方法是使用 await fs.promises.open(...)
预打开文件并将 fd
传递给 fs.createReadStream
然后您会看到错误打开自己。
不同的解决方案 - 包装 readLine 迭代器以添加错误处理
警告,这看起来有点 hack,但这是一个非常有趣的学习项目,因为我最终不得不用自己的 readline asyncIterator
包装起来,以便在我检测到readStream
错误(缺少 readline
库的错误处理)。
我的任务是弄清楚如何编写一个 processLineByLine()
函数,该函数将 return 和 asyncIterator
正确拒绝流错误(即使 readline
代码在这方面有错误),同时仍在内部使用 readline 库。
目标是能够像这样编写代码:
for await (let line of processLineByLine("somefile1.txt")) {
console.log(line);
}
正确处理了内部使用的readStream的错误,无论是文件不存在,存在但打不开,甚至后面读取时遇到读取错误。由于我不是 changing/fixing 内部的 readline 接口代码,我必须在 readStream 上安装我自己的 error
侦听器,当我在那里看到错误时,我需要从readline接口拒绝。
这是我最终得到的结果:
// This is an experiment to wrap the lines asyncIterator with our own iterator
// so we can reject when there's been an error on the readStream. It's really
// ugly, but does work.
const fs = require('fs');
const readline = require('readline');
function processLineByLine(filename, options = {}) {
const fileStream = fs.createReadStream(filename, options);
let latchedError = null;
let kill = new Set();
fileStream.on('error', (err) => {
latchedError = err;
// any open promises waiting on this stream, need to get rejected now
for (let fn of kill) {
fn(err);
}
});
const lines = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// create our own little asyncIterator that wraps the lines asyncIterator
// so we can reject when we need to
function asyncIterator() {
const linesIterator = lines[Symbol.asyncIterator]();
return {
next: function() {
if (latchedError) {
return Promise.reject(latchedError);
} else {
return new Promise((resolve, reject) => {
// save reject handlers in higher scope so they can be called
// from the stream error handler
kill.add(reject);
let p = linesIterator.next();
// have our higher level promise track the iterator promise
// except when we reject it from the outside upon stream error
p.then((data => {
// since we're resolving now, let's removing our reject
// handler from the kill storage. This will allow this scope
// to be properly garbage collected
kill.delete(reject);
resolve(data);
}), reject);
});
}
}
}
}
var asyncIterable = {
[Symbol.asyncIterator]: asyncIterator
};
return asyncIterable;
}
async function runIt() {
for await (let line of processLineByLine("xfile1.txt")) {
console.log(line);
}
}
runIt().then(() => {
console.log("done");
}).catch(err => {
console.log("final Error", err);
});
有关其工作原理的一些解释...
我们自己在流上的错误监控
首先,你可以看到这个:
fileStream.on('error', (err) => {
latchedError = err;
// any open promises waiting on this stream, need to get rejected now
for (let fn of kill) {
fn(err);
}
});
这是我们自己在readStream上的错误监控,弥补了readline内部错误处理的缺失。任何时候我们看到一个错误,我们将它保存在一个更高范围的变量中以供以后使用,如果有任何从 readline 为这个流注册的未决承诺,我们 "kill" 他们(拒绝他们,你稍后会看到如何有效)。
文件打开错误没有特殊处理
这里的部分目标是摆脱先前解决方案中对文件打开错误的特殊处理。我们希望 readStream 上的任何错误都触发对 asyncIterable 的拒绝,因此这是一种更通用的机制。文件打开错误在此错误处理中被捕获,就像任何其他读取错误一样。
我们自己的asyncIterable和asyncIterator
调用 readline.createInterace()
return 一个 asyncIterable。它与常规可迭代对象基本相同,您可以在其上调用特殊的 属性 以获得 asyncIterator
。 asyncIterator
有一个 .next()
属性 就像一个普通的迭代器一样,除了当 asyncIterator.next()
被调用时,它 return 是一个解析为对象而不是对象的承诺一个对象。
所以,这就是 for await (let line of lines)
的工作原理。它首先调用 lines[Symbol.asyncIterator]()
来获取一个 asyncIterator。然后,在返回的 asyncIterator
上,它重复 await asyncIterator.next()
等待 asyncIterator.next()
returns.
的承诺
现在,readline.createInterface()
已经 return 这样的 asyncIterable
。但是,它不能正常工作。当 readStream
出现错误时,它不会在每次迭代时拒绝 .next()
编辑的承诺 return。事实上,这个承诺永远不会被拒绝或解决。所以,事情陷入僵局。在我的测试应用程序中,应用程序将只是退出,因为 readStream 已完成(在错误之后)并且不再有任何东西阻止应用程序退出,即使 promise 仍然悬而未决。
因此,我需要一种方法来强制拒绝 readlineIterator.next()
之前 return 编辑并且目前正被 for await (...)
等待的承诺。好吧,promise 不提供用于拒绝它的外部接口,而且我们无法访问可以拒绝它的 readline
实现的内部。
我的解决方案是用我自己的 readlineIterator 包装 readlineIterator 作为一种代理。然后,我们自己的错误检测器发现错误,并且 readline 中有未完成的承诺,我可以使用我的 proxy/wrapper 强制拒绝那些未完成的承诺。这将导致 for await (...)
看到拒绝并得到正确的错误。而且,它有效。
我花了一段时间才充分了解 asyncIterators
的工作原理,以便能够包装一个。我非常感谢这篇 Asynchronous Iterators in JavaScript 文章,它为构建您自己的 asyncIterable 和 asyncIterator 提供了一些非常有用的代码示例。这实际上是本练习中真正学习的地方,其他人可能会通过理解上面代码中的工作原理来学习。
强制拒绝包装承诺
此代码中的 "ugliness" 强制承诺从该承诺的拒绝处理程序的通常范围之外拒绝。这是通过将拒绝处理程序存储在更高级别的范围内来完成的,其中 readStream
的错误处理可以调用承诺拒绝的触发器。可能有一种更优雅的编码方式,但这行得通。
制作我们自己的 asyncIterable
异步可迭代对象只是一个对象,上面有一个名为 [Symbol.asyncIterator]
的 属性。 属性 必须是一个函数,当不带参数调用时,return 是一个 asyncIterator.
所以,这是我们的 asyncIterable
.
var asyncIterable = {
[Symbol.asyncIterator]: asyncIterator
};
制作我们自己的异步迭代器
asyncIterator
是一个函数,调用时 return 是一个带有 next()
属性 的对象。每次 obj.next()
被调用时,它 return 是一个解析为通常的迭代器元组对象 {done, value}
的承诺。我们不必担心已解析的值,因为我们将从 readline 的迭代器中获取它。所以,这是我们的 asyncIterator
:
// create our own little asyncIterator that wraps the lines asyncIterator
// so we can reject when we need to
function asyncIterator() {
const linesIterator = lines[Symbol.asyncIterator]();
return {
next: function() {
if (latchedError) {
return Promise.reject(latchedError);
} else {
return new Promise((resolve, reject) => {
// save reject handlers in higher scope so they can be called
// from the stream error handler
kill.push(reject);
let p = linesIterator.next();
// have our higher level promise track the iterator promise
// except when we reject it from the outside upon stream error
p.then(resolve, reject);
});
}
}
}
}
首先,它从 readline 接口(我们是 proxying/wrapping 的接口)获取 asyncIterator 并将其存储在本地范围内,以便我们稍后使用。
然后,它return是{next: fn}
形式的强制迭代器结构。然后,在该函数内部是我们的包装逻辑展开的地方。如果我们已经看到之前的锁定错误,那么我们总是 return Promise.reject(latchedError);
。如果没有错误,那么我们 return 一个手动构建的承诺。
在该承诺的执行程序函数中,我们通过将其添加到名为 kill
的更高范围 Set
中来注册我们的拒绝处理。这允许我们更高范围的 filestream.on('error', ....)
处理程序在通过调用该函数发现错误时拒绝此承诺。
然后,我们调用 linesIterator.next()
来获得它 return 的承诺。我们对该承诺的 resolve 和 reject 回调都感兴趣。如果该承诺得到正确解决,我们将从更高级别的范围中删除我们的拒绝处理程序(以启用我们范围的更好的垃圾收集),然后使用相同的解决值解决我们的 wrap/proxy 承诺。
如果 linesIterator promise 拒绝,我们只是通过我们的 wrap/proxy promise 传递拒绝。
我们自己的文件流错误处理
所以,现在是最后的解释。我们有这个错误处理程序正在监视流:
fileStream.on('error', (err) => {
latchedError = err;
// any open promises waiting on this stream, need to get rejected now
for (let fn of kill) {
fn(err);
}
});
这有两个作用。首先,它 stores/latches 错误,因此以后对 lines 迭代器的任何调用都将因先前的错误而被拒绝。其次,如果行迭代器有任何未决的承诺等待解决,它会循环遍历 kill
集合并拒绝这些承诺。这就是让 asyncIterator 承诺被正确拒绝的原因。这应该发生在 readline
代码中,但由于它没有正确执行,我们强制我们的 wrap/proxy 承诺拒绝,以便调用者在流出错时看到正确的拒绝。
最后,你可以这样做,因为所有丑陋的细节都隐藏在包裹的后面 asyncIterable
:
async function runIt() {
for await (let line of processLineByLine("xfile1.txt")) {
console.log(line);
}
}
runIt().then(() => {
console.log("done");
}).catch(err => {
console.log("final Error", err);
});
我是这样处理的:
new Promise(async (resolve, reject) => {
const input = fs.createReadStream(o.file)
input.on('error', (err: any) => {
// Handle errors here
reject(new Error(err.stack))
})
const crlfDelay = Infinity
const rl = readline.createInterface({ input, crlfDelay })
rl.on('line', (line) => {
// Handle lines here
})
await events.once(rl, 'close')
resolve(/* some value */)
})
不幸的是,我没有时间看完@jfriend00 的所有回答,但我确实阅读了他分享的第一段代码,但它对我不起作用,因为:
'open'
事件在 'error'
事件之前启动。我不知道我是否做错了什么,因为 'error'
发生在 之后 'open'
.
感觉很古怪
无论如何,似乎我编写的代码可以工作,但绝不认为它可以投入生产(至少在时间过去之前我没有发现任何问题与它)。
据我所知,节点 16 添加了一个异步方法 createReadStream
到 FileHandle
class (https://nodejs.org/docs/latest-v16.x/api/fs.html#filehandlecreatereadstreamoptions).
所以我猜你可以只使用那个方法, await
它并且(我假设)如果它失败了你会得到一个很好的错误,但我没有时间尝试那个,如果有人愿意的话请尝试并添加一个关于它的comment/answer。
请记住,该代码当然仅限于 node16 以上,而上面的代码适用于 node14。
基于 processLineByLine() 的示例,我注意到如果给定的文件名不存在,我们将无法捕获错误。在这种情况下,程序以类似以下内容结束:
UnhandledPromiseRejectionWarning: Error: ENOENT: no such file or directory
因此,我采用的引发可捕获错误的最简单方法是对 processLineByLine()
函数进行 2 次修改:
- 将其放入生成器中,例如
function*
await
文件存在检查await access(filename, fs.constants.F_OK)
最后我不得不将 readline.Interface
实例转换为异步生成器。 我特别不喜欢这最后一部分。生成的 lines()
函数如下:
export async function* lines(filename) {
await access(filename, fs.constants.F_OK)
const lines = readline.createInterface({
input: fs.createReadStream(filename),
crlfDelay: Infinity
})
for await (const l of lines) {
yield l
}
}
问题:是否有更好的方法使 lines()
成为异步迭代器或在文件名不存在时抛出错误?
BUG 报告: 关于@jfriend00 的观察,我在 nodejs 上打开了一个 Bug 问题:https://github.com/nodejs/node/issues/30831
嗯,这是一个棘手的问题。即使在飞行前检测文件是否存在也不能保证您可以成功打开它(它可能被锁定或有权限问题)并且在打开之前检测它是否存在是服务器开发中的经典竞争条件(小 window,但仍然是竞争条件)。
我仍然认为必须有更好的方法来从 fs.createReadStream()
中获取错误,但我能找到的唯一方法是将其包装在一个承诺中,该承诺仅在文件是成功打开。这使您可以从打开文件中获取错误并将其传播回 async
函数的调用者。这是它的样子:
const fs = require('fs');
const readline = require('readline');
function createReadStreamSafe(filename, options) {
return new Promise((resolve, reject) => {
const fileStream = fs.createReadStream(filename, options);
fileStream.on('error', reject).on('open', () => {
resolve(filestream);
});
});
}
async function processLineByLine(f) {
const fileStream = await createReadStreamSafe(f);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
for await (const line of rl) {
// Each line in input.txt will be successively available here as `line`.
console.log(`Line from file: ${line}`);
}
}
processLineByLine("nofile").catch(err => {
console.log("caught error");
});
这使得 processLineByLine()
return 的承诺将被拒绝,您可以在那里处理错误,这正是我认为您所要求的。如果我误解了你的要求,请澄清。
仅供参考,在我看来,这似乎是 readline.createInterface()
中的一个错误,因为它似乎应该在 for await (const line of rl)
的第一次迭代中拒绝,但事实并非如此。
因此,即使是这种解决方法也不会在流打开后检测到读取错误。这确实需要在 createInterface()
内部修复。我同意文件打开错误或读取错误都应在 for await (const line of rl)
.
文件打开问题的另一种解决方法是使用 await fs.promises.open(...)
预打开文件并将 fd
传递给 fs.createReadStream
然后您会看到错误打开自己。
不同的解决方案 - 包装 readLine 迭代器以添加错误处理
警告,这看起来有点 hack,但这是一个非常有趣的学习项目,因为我最终不得不用自己的 readline asyncIterator
包装起来,以便在我检测到readStream
错误(缺少 readline
库的错误处理)。
我的任务是弄清楚如何编写一个 processLineByLine()
函数,该函数将 return 和 asyncIterator
正确拒绝流错误(即使 readline
代码在这方面有错误),同时仍在内部使用 readline 库。
目标是能够像这样编写代码:
for await (let line of processLineByLine("somefile1.txt")) {
console.log(line);
}
正确处理了内部使用的readStream的错误,无论是文件不存在,存在但打不开,甚至后面读取时遇到读取错误。由于我不是 changing/fixing 内部的 readline 接口代码,我必须在 readStream 上安装我自己的 error
侦听器,当我在那里看到错误时,我需要从readline接口拒绝。
这是我最终得到的结果:
// This is an experiment to wrap the lines asyncIterator with our own iterator
// so we can reject when there's been an error on the readStream. It's really
// ugly, but does work.
const fs = require('fs');
const readline = require('readline');
function processLineByLine(filename, options = {}) {
const fileStream = fs.createReadStream(filename, options);
let latchedError = null;
let kill = new Set();
fileStream.on('error', (err) => {
latchedError = err;
// any open promises waiting on this stream, need to get rejected now
for (let fn of kill) {
fn(err);
}
});
const lines = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
// create our own little asyncIterator that wraps the lines asyncIterator
// so we can reject when we need to
function asyncIterator() {
const linesIterator = lines[Symbol.asyncIterator]();
return {
next: function() {
if (latchedError) {
return Promise.reject(latchedError);
} else {
return new Promise((resolve, reject) => {
// save reject handlers in higher scope so they can be called
// from the stream error handler
kill.add(reject);
let p = linesIterator.next();
// have our higher level promise track the iterator promise
// except when we reject it from the outside upon stream error
p.then((data => {
// since we're resolving now, let's removing our reject
// handler from the kill storage. This will allow this scope
// to be properly garbage collected
kill.delete(reject);
resolve(data);
}), reject);
});
}
}
}
}
var asyncIterable = {
[Symbol.asyncIterator]: asyncIterator
};
return asyncIterable;
}
async function runIt() {
for await (let line of processLineByLine("xfile1.txt")) {
console.log(line);
}
}
runIt().then(() => {
console.log("done");
}).catch(err => {
console.log("final Error", err);
});
有关其工作原理的一些解释...
我们自己在流上的错误监控
首先,你可以看到这个:
fileStream.on('error', (err) => {
latchedError = err;
// any open promises waiting on this stream, need to get rejected now
for (let fn of kill) {
fn(err);
}
});
这是我们自己在readStream上的错误监控,弥补了readline内部错误处理的缺失。任何时候我们看到一个错误,我们将它保存在一个更高范围的变量中以供以后使用,如果有任何从 readline 为这个流注册的未决承诺,我们 "kill" 他们(拒绝他们,你稍后会看到如何有效)。
文件打开错误没有特殊处理
这里的部分目标是摆脱先前解决方案中对文件打开错误的特殊处理。我们希望 readStream 上的任何错误都触发对 asyncIterable 的拒绝,因此这是一种更通用的机制。文件打开错误在此错误处理中被捕获,就像任何其他读取错误一样。
我们自己的asyncIterable和asyncIterator
调用 readline.createInterace()
return 一个 asyncIterable。它与常规可迭代对象基本相同,您可以在其上调用特殊的 属性 以获得 asyncIterator
。 asyncIterator
有一个 .next()
属性 就像一个普通的迭代器一样,除了当 asyncIterator.next()
被调用时,它 return 是一个解析为对象而不是对象的承诺一个对象。
所以,这就是 for await (let line of lines)
的工作原理。它首先调用 lines[Symbol.asyncIterator]()
来获取一个 asyncIterator。然后,在返回的 asyncIterator
上,它重复 await asyncIterator.next()
等待 asyncIterator.next()
returns.
现在,readline.createInterface()
已经 return 这样的 asyncIterable
。但是,它不能正常工作。当 readStream
出现错误时,它不会在每次迭代时拒绝 .next()
编辑的承诺 return。事实上,这个承诺永远不会被拒绝或解决。所以,事情陷入僵局。在我的测试应用程序中,应用程序将只是退出,因为 readStream 已完成(在错误之后)并且不再有任何东西阻止应用程序退出,即使 promise 仍然悬而未决。
因此,我需要一种方法来强制拒绝 readlineIterator.next()
之前 return 编辑并且目前正被 for await (...)
等待的承诺。好吧,promise 不提供用于拒绝它的外部接口,而且我们无法访问可以拒绝它的 readline
实现的内部。
我的解决方案是用我自己的 readlineIterator 包装 readlineIterator 作为一种代理。然后,我们自己的错误检测器发现错误,并且 readline 中有未完成的承诺,我可以使用我的 proxy/wrapper 强制拒绝那些未完成的承诺。这将导致 for await (...)
看到拒绝并得到正确的错误。而且,它有效。
我花了一段时间才充分了解 asyncIterators
的工作原理,以便能够包装一个。我非常感谢这篇 Asynchronous Iterators in JavaScript 文章,它为构建您自己的 asyncIterable 和 asyncIterator 提供了一些非常有用的代码示例。这实际上是本练习中真正学习的地方,其他人可能会通过理解上面代码中的工作原理来学习。
强制拒绝包装承诺
此代码中的 "ugliness" 强制承诺从该承诺的拒绝处理程序的通常范围之外拒绝。这是通过将拒绝处理程序存储在更高级别的范围内来完成的,其中 readStream
的错误处理可以调用承诺拒绝的触发器。可能有一种更优雅的编码方式,但这行得通。
制作我们自己的 asyncIterable
异步可迭代对象只是一个对象,上面有一个名为 [Symbol.asyncIterator]
的 属性。 属性 必须是一个函数,当不带参数调用时,return 是一个 asyncIterator.
所以,这是我们的 asyncIterable
.
var asyncIterable = {
[Symbol.asyncIterator]: asyncIterator
};
制作我们自己的异步迭代器
asyncIterator
是一个函数,调用时 return 是一个带有 next()
属性 的对象。每次 obj.next()
被调用时,它 return 是一个解析为通常的迭代器元组对象 {done, value}
的承诺。我们不必担心已解析的值,因为我们将从 readline 的迭代器中获取它。所以,这是我们的 asyncIterator
:
// create our own little asyncIterator that wraps the lines asyncIterator
// so we can reject when we need to
function asyncIterator() {
const linesIterator = lines[Symbol.asyncIterator]();
return {
next: function() {
if (latchedError) {
return Promise.reject(latchedError);
} else {
return new Promise((resolve, reject) => {
// save reject handlers in higher scope so they can be called
// from the stream error handler
kill.push(reject);
let p = linesIterator.next();
// have our higher level promise track the iterator promise
// except when we reject it from the outside upon stream error
p.then(resolve, reject);
});
}
}
}
}
首先,它从 readline 接口(我们是 proxying/wrapping 的接口)获取 asyncIterator 并将其存储在本地范围内,以便我们稍后使用。
然后,它return是{next: fn}
形式的强制迭代器结构。然后,在该函数内部是我们的包装逻辑展开的地方。如果我们已经看到之前的锁定错误,那么我们总是 return Promise.reject(latchedError);
。如果没有错误,那么我们 return 一个手动构建的承诺。
在该承诺的执行程序函数中,我们通过将其添加到名为 kill
的更高范围 Set
中来注册我们的拒绝处理。这允许我们更高范围的 filestream.on('error', ....)
处理程序在通过调用该函数发现错误时拒绝此承诺。
然后,我们调用 linesIterator.next()
来获得它 return 的承诺。我们对该承诺的 resolve 和 reject 回调都感兴趣。如果该承诺得到正确解决,我们将从更高级别的范围中删除我们的拒绝处理程序(以启用我们范围的更好的垃圾收集),然后使用相同的解决值解决我们的 wrap/proxy 承诺。
如果 linesIterator promise 拒绝,我们只是通过我们的 wrap/proxy promise 传递拒绝。
我们自己的文件流错误处理
所以,现在是最后的解释。我们有这个错误处理程序正在监视流:
fileStream.on('error', (err) => {
latchedError = err;
// any open promises waiting on this stream, need to get rejected now
for (let fn of kill) {
fn(err);
}
});
这有两个作用。首先,它 stores/latches 错误,因此以后对 lines 迭代器的任何调用都将因先前的错误而被拒绝。其次,如果行迭代器有任何未决的承诺等待解决,它会循环遍历 kill
集合并拒绝这些承诺。这就是让 asyncIterator 承诺被正确拒绝的原因。这应该发生在 readline
代码中,但由于它没有正确执行,我们强制我们的 wrap/proxy 承诺拒绝,以便调用者在流出错时看到正确的拒绝。
最后,你可以这样做,因为所有丑陋的细节都隐藏在包裹的后面 asyncIterable
:
async function runIt() {
for await (let line of processLineByLine("xfile1.txt")) {
console.log(line);
}
}
runIt().then(() => {
console.log("done");
}).catch(err => {
console.log("final Error", err);
});
我是这样处理的:
new Promise(async (resolve, reject) => {
const input = fs.createReadStream(o.file)
input.on('error', (err: any) => {
// Handle errors here
reject(new Error(err.stack))
})
const crlfDelay = Infinity
const rl = readline.createInterface({ input, crlfDelay })
rl.on('line', (line) => {
// Handle lines here
})
await events.once(rl, 'close')
resolve(/* some value */)
})
不幸的是,我没有时间看完@jfriend00 的所有回答,但我确实阅读了他分享的第一段代码,但它对我不起作用,因为:
'open'
事件在 'error'
事件之前启动。我不知道我是否做错了什么,因为 'error'
发生在 之后 'open'
.
无论如何,似乎我编写的代码可以工作,但绝不认为它可以投入生产(至少在时间过去之前我没有发现任何问题与它)。
据我所知,节点 16 添加了一个异步方法 createReadStream
到 FileHandle
class (https://nodejs.org/docs/latest-v16.x/api/fs.html#filehandlecreatereadstreamoptions).
所以我猜你可以只使用那个方法, await
它并且(我假设)如果它失败了你会得到一个很好的错误,但我没有时间尝试那个,如果有人愿意的话请尝试并添加一个关于它的comment/answer。
请记住,该代码当然仅限于 node16 以上,而上面的代码适用于 node14。