Nodejs `fs.createReadStream` 作为承诺
Nodejs `fs.createReadStream` as promise
我正在努力让 fs.createReadStream
像承诺一样工作,所以在读取整个文件后,它就会被解决。
在下面的情况下,我暂停流,执行可等待的方法并恢复。
- 如何让
.on('end'...
最后执行
- 如果 1. 不可能,为什么 `.on('wont be fired',也许我可以用它来解决承诺。
function parseFile<T>(filePath: string, row: (x: T) => void, err: (x) => void, end: (x) => void) {
return new Promise((resolve, reject) => {
const stream = fs.createReadStream(filePath);
stream.on('data', async data => {
try {
stream.pause();
await row(data);
} finally {
stream.resume();
}
})
.on('end', (rowCount: number) => {
resolve();// NOT REALLY THE END row(data) is still being called after this
})
.on('close', () => {
resolve();// NEVER BEING CALLED
})
.on('error', (rowCount: number) => {
reject();// NEVER GETS HERE, AS EXPECTED
})
})
}
更新
这里可以实际测试一下:https://stackblitz.com/edit/node-czktjh?file=index.js
运行 node index.js
输出应该是 1000 而不是 1
谢谢
需要注意的事项。您已经从问题的当前版本中删除了行处理,因此正在大块读取流。它似乎只读取两个块中的整个文件,因此只有两个 data
事件,所以这里的预期计数是 2,而不是 1000。
我认为此代码出现问题是因为 stream.pause()
不会暂停 end
事件的生成 - 它只会暂停未来的 data
事件。如果最后一个 data
事件已被触发,然后你 await
在那个 data
事件的处理中(这会导致你的 data
事件处理程序立即 return promise,流会认为它已经完成并且 end
事件仍然会在你完成等待最后一个 data
事件处理中的函数之前触发。记住,数据事件处理程序不是 promise-意识到。而且,stream.pause()
似乎只影响 data
事件,而不影响 end
事件。
我可以想象一个带有标志的变通方法,该标志跟踪您是否仍在处理数据事件并推迟处理 end
事件,直到您完成最后一个 data
事件。稍后我将为此添加代码,说明如何使用该标志。
仅供参考,丢失的 close
事件是另一个流怪异事件。您的 nodejs 程序实际上在 close
事件触发之前终止。如果你把它放在程序的开头:
setTimeout(() => { console.log('done with timer');}, 5000);
然后,您将看到关闭事件,因为计时器会阻止您的 nodejs 程序在关闭事件触发之前退出。我并不是建议将此作为任何问题的解决方案,只是为了说明 close
事件仍然存在,并且如果您的程序在它有机会之前没有退出则想要触发。
以下代码演示了如何使用标志来解决暂停问题。当您 运行 这段代码时,您只会看到 2 个 data
事件,而不是 1000 个,因为这段代码不是读取行,而是读取比那大得多的块。所以,这个的预期结果不是1000。
// run `node index.js` in the terminal
const fs = require('fs');
const parseFile = row => {
let paused = true;
let ended = false;
let dataCntr = 0;
return new Promise((resolve, reject) => {
const stream = fs.createReadStream('./generated.data.csv');
stream
.on('data', async data => {
++dataCntr;
try {
stream.pause();
paused = true;
await row(data);
} finally {
paused = false;
stream.resume();
if (ended) {
console.log(`received ${dataCntr} data events`);
resolve();
}
}
})
.on('end', rowCount => {
ended = true;
if (!paused) {
console.log(`received ${dataCntr} data events`);
resolve();
}
})
.on('close', () => {
//resolve();
})
.on('error', rowCount => {
reject();
});
});
};
(async () => {
let count = 0;
await parseFile(async row => {
await new Promise(resolve => setTimeout(resolve, 50)); //sleep
count++;
});
console.log(`lines executed: ${count}, the expected is more than 1`);
})();
仅供参考,我仍然认为您问题的原始版本存在我在第一条评论中提到的问题 - 您没有暂停正确的流。这里记录的是另一个问题(在最后一个 data
事件中你的 await
完成之前你可以得到 end
)。
我正在努力让 fs.createReadStream
像承诺一样工作,所以在读取整个文件后,它就会被解决。
在下面的情况下,我暂停流,执行可等待的方法并恢复。
- 如何让
.on('end'...
最后执行 - 如果 1. 不可能,为什么 `.on('wont be fired',也许我可以用它来解决承诺。
function parseFile<T>(filePath: string, row: (x: T) => void, err: (x) => void, end: (x) => void) {
return new Promise((resolve, reject) => {
const stream = fs.createReadStream(filePath);
stream.on('data', async data => {
try {
stream.pause();
await row(data);
} finally {
stream.resume();
}
})
.on('end', (rowCount: number) => {
resolve();// NOT REALLY THE END row(data) is still being called after this
})
.on('close', () => {
resolve();// NEVER BEING CALLED
})
.on('error', (rowCount: number) => {
reject();// NEVER GETS HERE, AS EXPECTED
})
})
}
更新
这里可以实际测试一下:https://stackblitz.com/edit/node-czktjh?file=index.js
运行 node index.js
输出应该是 1000 而不是 1
谢谢
需要注意的事项。您已经从问题的当前版本中删除了行处理,因此正在大块读取流。它似乎只读取两个块中的整个文件,因此只有两个 data
事件,所以这里的预期计数是 2,而不是 1000。
我认为此代码出现问题是因为 stream.pause()
不会暂停 end
事件的生成 - 它只会暂停未来的 data
事件。如果最后一个 data
事件已被触发,然后你 await
在那个 data
事件的处理中(这会导致你的 data
事件处理程序立即 return promise,流会认为它已经完成并且 end
事件仍然会在你完成等待最后一个 data
事件处理中的函数之前触发。记住,数据事件处理程序不是 promise-意识到。而且,stream.pause()
似乎只影响 data
事件,而不影响 end
事件。
我可以想象一个带有标志的变通方法,该标志跟踪您是否仍在处理数据事件并推迟处理 end
事件,直到您完成最后一个 data
事件。稍后我将为此添加代码,说明如何使用该标志。
仅供参考,丢失的 close
事件是另一个流怪异事件。您的 nodejs 程序实际上在 close
事件触发之前终止。如果你把它放在程序的开头:
setTimeout(() => { console.log('done with timer');}, 5000);
然后,您将看到关闭事件,因为计时器会阻止您的 nodejs 程序在关闭事件触发之前退出。我并不是建议将此作为任何问题的解决方案,只是为了说明 close
事件仍然存在,并且如果您的程序在它有机会之前没有退出则想要触发。
以下代码演示了如何使用标志来解决暂停问题。当您 运行 这段代码时,您只会看到 2 个 data
事件,而不是 1000 个,因为这段代码不是读取行,而是读取比那大得多的块。所以,这个的预期结果不是1000。
// run `node index.js` in the terminal
const fs = require('fs');
const parseFile = row => {
let paused = true;
let ended = false;
let dataCntr = 0;
return new Promise((resolve, reject) => {
const stream = fs.createReadStream('./generated.data.csv');
stream
.on('data', async data => {
++dataCntr;
try {
stream.pause();
paused = true;
await row(data);
} finally {
paused = false;
stream.resume();
if (ended) {
console.log(`received ${dataCntr} data events`);
resolve();
}
}
})
.on('end', rowCount => {
ended = true;
if (!paused) {
console.log(`received ${dataCntr} data events`);
resolve();
}
})
.on('close', () => {
//resolve();
})
.on('error', rowCount => {
reject();
});
});
};
(async () => {
let count = 0;
await parseFile(async row => {
await new Promise(resolve => setTimeout(resolve, 50)); //sleep
count++;
});
console.log(`lines executed: ${count}, the expected is more than 1`);
})();
仅供参考,我仍然认为您问题的原始版本存在我在第一条评论中提到的问题 - 您没有暂停正确的流。这里记录的是另一个问题(在最后一个 data
事件中你的 await
完成之前你可以得到 end
)。