如何处理来自 fs readline.Interface 异步迭代器的错误

How to handle error from fs readline.Interface async iterator

基于 processLineByLine() 的示例,我注意到如果给定的文件名不存在,我们将无法捕获错误。在这种情况下,程序以类似以下内容结束:

UnhandledPromiseRejectionWarning: Error: ENOENT: no such file or directory

因此,我采用的引发可捕获错误的最简单方法是对 processLineByLine() 函数进行 2 次修改:

  1. 将其放入生成器中,例如 function*
  2. await 文件存在检查 await access(filename, fs.constants.F_OK)

最后我不得不将 readline.Interface 实例转换为异步生成器。 我特别不喜欢这最后一部分。生成的 lines() 函数如下:

export async function* lines(filename) {
    await access(filename, fs.constants.F_OK)
    const lines = readline.createInterface({
        input: fs.createReadStream(filename),
        crlfDelay: Infinity
    })
    for await (const l of lines) {
        yield l
    }
}

问题:是否有更好的方法使 lines() 成为异步迭代器或在文件名不存在时抛出错误?

BUG 报告: 关于@jfriend00 的观察,我在 nodejs 上打开了一个 Bug 问题:https://github.com/nodejs/node/issues/30831

嗯,这是一个棘手的问题。即使在飞行前检测文件是否存在也不能保证您可以成功打开它(它可能被锁定或有权限问题)并且在打开之前检测它是否存在是服务器开发中的经典竞争条件(小 window,但仍然是竞争条件)。

我仍然认为必须有更好的方法来从 fs.createReadStream() 中获取错误,但我能找到的唯一方法是将其包装在一个承诺中,该承诺仅在文件是成功打开。这使您可以从打开文件中获取错误并将其传播回 async 函数的调用者。这是它的样子:

const fs = require('fs');
const readline = require('readline');

function createReadStreamSafe(filename, options) {
    return new Promise((resolve, reject) => {
        const fileStream = fs.createReadStream(filename, options);
        fileStream.on('error', reject).on('open', () => {
            resolve(filestream);
        });

    });
}

async function processLineByLine(f) {
  const fileStream = await createReadStreamSafe(f);

  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity
  });

  for await (const line of rl) {
    // Each line in input.txt will be successively available here as `line`.
    console.log(`Line from file: ${line}`);
  }
}

processLineByLine("nofile").catch(err => {
    console.log("caught error");
});

这使得 processLineByLine() return 的承诺将被拒绝,您可以在那里处理错误,这正是我认为您所要求的。如果我误解了你的要求,请澄清。

仅供参考,在我看来,这似乎是 readline.createInterface() 中的一个错误,因为它似乎应该在 for await (const line of rl) 的第一次迭代中拒绝,但事实并非如此。

因此,即使是这种解决方法也不会在流打开后检测到读取错误。这确实需要在 createInterface() 内部修复。我同意文件打开错误或读取错误都应在 for await (const line of rl).

上显示为拒绝

文件打开问题的另一种解决方法是使用 await fs.promises.open(...) 预打开文件并将 fd 传递给 fs.createReadStream 然后您会看到错误打开自己。


不同的解决方案 - 包装 readLine 迭代器以添加错误处理

警告,这看起来有点 hack,但这是一个非常有趣的学习项目,因为我最终不得不用自己的 readline asyncIterator 包装起来,以便在我检测到readStream 错误(缺少 readline 库的错误处理)。

我的任务是弄清楚如何编写一个 processLineByLine() 函数,该函数将 return 和 asyncIterator 正确拒绝流错误(即使 readline 代码在这方面有错误),同时仍在内部使用 readline 库。

目标是能够像这样编写代码:

for await (let line of processLineByLine("somefile1.txt")) {
     console.log(line);
 }

正确处理了内部使用的readStream的错误,无论是文件不存在,存在但打不开,甚至后面读取时遇到读取错误。由于我不是 changing/fixing 内部的 readline 接口代码,我必须在 readStream 上安装我自己的 error 侦听器,当我在那里看到错误时,我需要从readline接口拒绝。

这是我最终得到的结果:

// This is an experiment to wrap the lines asyncIterator with our own iterator
// so we can reject when there's been an error on the readStream.  It's really
// ugly, but does work.

const fs = require('fs');
const readline = require('readline');

function processLineByLine(filename, options = {}) {
    const fileStream = fs.createReadStream(filename, options);
    let latchedError = null;
    let kill = new Set();

    fileStream.on('error', (err) => {
        latchedError = err;
        // any open promises waiting on this stream, need to get rejected now
        for (let fn of kill) {
            fn(err);
        }
    });

    const lines = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity
    });

    // create our own little asyncIterator that wraps the lines asyncIterator
    //   so we can reject when we need to
    function asyncIterator() {
        const linesIterator = lines[Symbol.asyncIterator]();
        return {
            next: function() {
                if (latchedError) {
                    return Promise.reject(latchedError);
                } else {
                    return new Promise((resolve, reject) => {
                        // save reject handlers in higher scope so they can be called 
                        // from the stream error handler
                        kill.add(reject);

                        let p = linesIterator.next();

                        // have our higher level promise track the iterator promise
                        // except when we reject it from the outside upon stream error
                        p.then((data => {
                            // since we're resolving now, let's removing our reject
                            // handler from the kill storage.  This will allow this scope
                            // to be properly garbage collected
                            kill.delete(reject);
                            resolve(data);
                        }), reject);
                    });
                }
            }
        }
    }

    var asyncIterable = {
        [Symbol.asyncIterator]: asyncIterator
    };

    return asyncIterable;
}

async function runIt() {
    for await (let line of processLineByLine("xfile1.txt")) {
         console.log(line);
     }
 }

runIt().then(() => {
    console.log("done");
}).catch(err => {
    console.log("final Error", err);
});

有关其工作原理的一些解释...

我们自己在流上的错误监控

首先,你可以看到这个:

    fileStream.on('error', (err) => {
        latchedError = err;
        // any open promises waiting on this stream, need to get rejected now
        for (let fn of kill) {
            fn(err);
        }
    });

这是我们自己在readStream上的错误监控,弥补了readline内部错误处理的缺失。任何时候我们看到一个错误,我们将它保存在一个更高范围的变量中以供以后使用,如果有任何从 readline 为这个流注册的未决承诺,我们 "kill" 他们(拒绝他们,你稍后会看到如何有效)。

文件打开错误没有特殊处理

这里的部分目标是摆脱先前解决方案中对文件打开错误的特殊处理。我们希望 readStream 上的任何错误都触发对 asyncIterable 的拒绝,因此这是一种更通用的机制。文件打开错误在此错误处理中被捕获,就像任何其他读取错误一样。

我们自己的asyncIterable和asyncIterator

调用 readline.createInterace() return 一个 asyncIterable。它与常规可迭代对象基本相同,您可以在其上调用特殊的 属性 以获得 asyncIteratorasyncIterator 有一个 .next() 属性 就像一个普通的迭代器一样,除了当 asyncIterator.next() 被调用时,它 return 是一个解析为对象而不是对象的承诺一个对象。

所以,这就是 for await (let line of lines) 的工作原理。它首先调用 lines[Symbol.asyncIterator]() 来获取一个 asyncIterator。然后,在返回的 asyncIterator 上,它重复 await asyncIterator.next() 等待 asyncIterator.next() returns.

的承诺

现在,readline.createInterface() 已经 return 这样的 asyncIterable。但是,它不能正常工作。当 readStream 出现错误时,它不会在每次迭代时拒绝 .next() 编辑的承诺 return。事实上,这个承诺永远不会被拒绝或解决。所以,事情陷入僵局。在我的测试应用程序中,应用程序将只是退出,因为 readStream 已完成(在错误之后)并且不再有任何东西阻止应用程序退出,即使 promise 仍然悬而未决。

因此,我需要一种方法来强制拒绝 readlineIterator.next() 之前 return 编辑并且目前正被 for await (...) 等待的承诺。好吧,promise 不提供用于拒绝它的外部接口,而且我们无法访问可以拒绝它的 readline 实现的内部。

我的解决方案是用我自己的 readlineIterator 包装 readlineIterator 作为一种代理。然后,我们自己的错误检测器发现错误,并且 readline 中有未完成的承诺,我可以使用我的 proxy/wrapper 强制拒绝那些未完成的承诺。这将导致 for await (...) 看到拒绝并得到正确的错误。而且,它有效。

我花了一段时间才充分了解 asyncIterators 的工作原理,以便能够包装一个。我非常感谢这篇 Asynchronous Iterators in JavaScript 文章,它为构建您自己的 asyncIterable 和 asyncIterator 提供了一些非常有用的代码示例。这实际上是本练习中真正学习的地方,其他人可能会通过理解上面代码中的工作原理来学习。

强制拒绝包装承诺

此代码中的 "ugliness" 强制承诺从该承诺的拒绝处理程序的通常范围之外拒绝。这是通过将拒绝处理程序存储在更高级别的范围内来完成的,其中 readStream 的错误处理可以调用​​承诺拒绝的触发器。可能有一种更优雅的编码方式,但这行得通。

制作我们自己的 asyncIterable

异步可迭代对象只是一个对象,上面有一个名为 [Symbol.asyncIterator] 的 属性。 属性 必须是一个函数,当不带参数调用时,return 是一个 asyncIterator. 所以,这是我们的 asyncIterable.

var asyncIterable = {
    [Symbol.asyncIterator]: asyncIterator
};

制作我们自己的异步迭代器

asyncIterator 是一个函数,调用时 return 是一个带有 next() 属性 的对象。每次 obj.next() 被调用时,它 return 是一个解析为通常的迭代器元组对象 {done, value} 的承诺。我们不必担心已解析的值,因为我们将从 readline 的迭代器中获取它。所以,这是我们的 asyncIterator:

// create our own little asyncIterator that wraps the lines asyncIterator
//   so we can reject when we need to
function asyncIterator() {
    const linesIterator = lines[Symbol.asyncIterator]();
    return {
        next: function() {
            if (latchedError) {
                return Promise.reject(latchedError);
            } else {
                return new Promise((resolve, reject) => {
                    // save reject handlers in higher scope so they can be called 
                    // from the stream error handler
                    kill.push(reject);

                    let p = linesIterator.next();

                    // have our higher level promise track the iterator promise
                    // except when we reject it from the outside upon stream error
                    p.then(resolve, reject);
                });
            }
        }
    }
}

首先,它从 readline 接口(我们是 proxying/wrapping 的接口)获取 asyncIterator 并将其存储在本地范围内,以便我们稍后使用。

然后,它return是{next: fn}形式的强制迭代器结构。然后,在该函数内部是我们的包装逻辑展开的地方。如果我们已经看到之前的锁定错误,那么我们总是 return Promise.reject(latchedError);。如果没有错误,那么我们 return 一个手动构建的承诺。

在该承诺的执行程序函数中,我们通过将其添加到名为 kill 的更高范围 Set 中来注册我们的拒绝处理。这允许我们更高范围的 filestream.on('error', ....) 处理程序在通过调用该函数发现错误时拒绝此承诺。

然后,我们调用 linesIterator.next() 来获得它 return 的承诺。我们对该承诺的 resolve 和 reject 回调都感兴趣。如果该承诺得到正确解决,我们将从更高级别的范围中删除我们的拒绝处理程序(以启用我们范围的更好的垃圾收集),然后使用相同的解决值解决我们的 wrap/proxy 承诺。

如果 linesIterator promise 拒绝,我们只是通过我们的 wrap/proxy promise 传递拒绝。

我们自己的文件流错误处理

所以,现在是最后的解释。我们有这个错误处理程序正在监视流:

fileStream.on('error', (err) => {
    latchedError = err;
    // any open promises waiting on this stream, need to get rejected now
    for (let fn of kill) {
        fn(err);
    }
});

这有两个作用。首先,它 stores/latches 错误,因此以后对 lines 迭代器的任何调用都将因先前的错误而被拒绝。其次,如果行迭代器有任何未决的承诺等待解决,它会循环遍历 kill 集合并拒绝这些承诺。这就是让 asyncIterator 承诺被正确拒绝的原因。这应该发生在 readline 代码中,但由于它没有正确执行,我们强制我们的 wrap/proxy 承诺拒绝,以便调用者在流出错时看到正确的拒绝。


最后,你可以这样做,因为所有丑陋的细节都隐藏在包裹的后面 asyncIterable:

async function runIt() {
    for await (let line of processLineByLine("xfile1.txt")) {
         console.log(line);
     }
 }

runIt().then(() => {
    console.log("done");
}).catch(err => {
    console.log("final Error", err);
});

我是这样处理的:

  new Promise(async (resolve, reject) => {
    const input = fs.createReadStream(o.file)
    input.on('error', (err: any) => {
      // Handle errors here
      reject(new Error(err.stack))
    })
    const crlfDelay = Infinity
    const rl = readline.createInterface({ input, crlfDelay })
    rl.on('line', (line) => {
      // Handle lines here
    })
    await events.once(rl, 'close')
    resolve(/* some value */)
  })

不幸的是,我没有时间看完@jfriend00 的所有回答,但我确实阅读了他分享的第一段代码,但它对我不起作用,因为:
'open' 事件在 'error' 事件之前启动。我不知道我是否做错了什么,因为 'error' 发生在 之后 'open'.

感觉很古怪

无论如何,似乎我编写的代码可以工作,但绝不认为它可以投入生产(至少在时间过去之前我没有发现任何问题与它)。

据我所知,节点 16 添加了一个异步方法 createReadStreamFileHandle class (https://nodejs.org/docs/latest-v16.x/api/fs.html#filehandlecreatereadstreamoptions).
所以我猜你可以只使用那个方法, await 它并且(我假设)如果它失败了你会得到一个很好的错误,但我没有时间尝试那个,如果有人愿意的话请尝试并添加一个关于它的comment/answer。

请记住,该代码当然仅限于 node16 以上,而上面的代码适用于 node14。