Nodejs `fs.createReadStream` 作为承诺

Nodejs `fs.createReadStream` as promise

我正在努力让 fs.createReadStream 像承诺一样工作,所以在读取整个文件后,它就会被解决。

在下面的情况下,我暂停流,执行可等待的方法并恢复。

  1. 如何让.on('end'...最后执行
  2. 如果 1. 不可能,为什么 `.on('wont be fired',也许我可以用它来解决承诺。
function parseFile<T>(filePath: string, row: (x: T) => void, err: (x) => void, end: (x) => void) {
        return new Promise((resolve, reject) => {
            const stream = fs.createReadStream(filePath);
            stream.on('data', async data => {
                    try {
                        stream.pause();
                        await row(data);
                    } finally {
                        stream.resume();
                    }
                })
                .on('end', (rowCount: number) => {
                    resolve();// NOT REALLY THE END row(data) is still being called after this
                })
                .on('close', () => {
                    resolve();// NEVER BEING CALLED
                })
                .on('error', (rowCount: number) => {
                    reject();// NEVER GETS HERE, AS EXPECTED
                })
        })
}

更新
这里可以实际测试一下:https://stackblitz.com/edit/node-czktjh?file=index.js

运行 node index.js

输出应该是 1000 而不是 1

谢谢

需要注意的事项。您已经从问题的当前版本中删除了行处理,因此正在大块读取流。它似乎只读取两个块中的整个文件,因此只有两个 data 事件,所以这里的预期计数是 2,而不是 1000。

我认为此代码出现问题是因为 stream.pause() 不会暂停 end 事件的生成 - 它只会暂停未来的 data 事件。如果最后一个 data 事件已被触发,然后你 await 在那个 data 事件的处理中(这会导致你的 data 事件处理程序立即 return promise,流会认为它已经完成并且 end 事件仍然会在你完成等待最后一个 data 事件处理中的函数之前触发。记住,数据事件处理程序不是 promise-意识到。而且,stream.pause() 似乎只影响 data 事件,而不影响 end 事件。

我可以想象一个带有标志的变通方法,该标志跟踪您是否仍在处理数据事件并推迟处理 end 事件,直到您完成最后一个 data 事件。稍后我将为此添加代码,说明如何使用该标志。

仅供参考,丢失的 close 事件是另一个流怪异事件。您的 nodejs 程序实际上在 close 事件触发之前终止。如果你把它放在程序的开头:

setTimeout(() => { console.log('done with timer');}, 5000);

然后,您将看到关闭事件,因为计时器会阻止您的 nodejs 程序在关闭事件触发之前退出。我并不是建议将此作为任何问题的解决方案,只是为了说明 close 事件仍然存在,并且如果您的程序在它有机会之前没有退出则想要触发。


以下代码演示了如何使用标志来解决暂停问题。当您 运行 这段代码时,您只会看到 2 个 data 事件,而不是 1000 个,因为这段代码不是读取行,而是读取比那大得多的块。所以,这个的预期结果不是1000。

// run `node index.js` in the terminal
const fs = require('fs');

const parseFile = row => {
  let  paused = true;
  let ended = false;
  let dataCntr = 0;
  return new Promise((resolve, reject) => {
    const stream = fs.createReadStream('./generated.data.csv');
    stream
      .on('data', async data => {
        ++dataCntr;
        try {
          stream.pause();
          paused = true;
          await row(data);
        } finally {
          paused = false;
          stream.resume();
          if (ended) {
            console.log(`received ${dataCntr} data events`);
            resolve();
          }
        }
      })
      .on('end', rowCount => {
        ended = true;
        if (!paused) {
          console.log(`received ${dataCntr} data events`);
          resolve();
        }
      })
      .on('close', () => {
        //resolve();
      })
      .on('error', rowCount => {
        reject();
      });
  });
};
(async () => {
  let count = 0;
  await parseFile(async row => {
    await new Promise(resolve => setTimeout(resolve, 50)); //sleep
    count++;
  });
  console.log(`lines executed: ${count}, the expected is more than 1`);
})();

仅供参考,我仍然认为您问题的原始版本存在我在第一条评论中提到的问题 - 您没有暂停正确的流。这里记录的是另一个问题(在最后一个 data 事件中你的 await 完成之前你可以得到 end)。