如何防止fs.createReadStream打开超过x个文件?
How to prevent more than x number of files from being opened by fs.createReadStream?
在我的代码中,我在 readline.on("line"...
事件中调用了一个嵌套的 fs.createReadStream(...)
。最初我让程序彻底失败,因为 fs 将创建超过 1000 个读取流,并且将触发故障安全停止。
我在打开读取流之前进行了检查,以查看打开了多少文件,如果打开的文件超过我想要的数量,我将暂停直到它们关闭。但是,由于代码是异步调用的,因此在其他任务开始停止之前,似乎有超过 600 个检查通过并打开了文件。我在想出一种限制读取流数量的方法时遇到问题,我们将不胜感激任何解决方案。
整个脚本的目的是逐行比较两个非常大的文件。每个文件都有超过 20 万行。
我排除了函数 formatLine
和 checkLine
并重构了我的代码以尝试保持内容与问题相关。
var fs = require( 'fs' );
const readline = require('readline');
const files = {open: 0};
const wait = ms => new Promise(r => setTimeout(r, ms));
//for debugging purposes, just looking at how many files are open
setInterval(() => console.log(files.open), 500);
async function stallIfNeeded() {
while (files.open > 50) {
await wait(500);
}
}
async function checkLineExists(line) {
await stallIfNeeded();
let readStream = fs.createReadStream( "./largeFileOfUncertianValues.csv" );
files.open = files.open + 1;
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity
});
const hasLine = await new Promise(async res => {
let found = false;
rl.on( 'close', () => {
files.open = files.open - 1;
res(found);
});
rl.on("line", oline => {
const hasLine = oline.includes(line);
if (hasLine) {
found = true;
rl.close();
}
});
});
return hasLine;
}
(async () => {
const sourceStream = fs.createReadStream('largeFileOfKnownValues.csv');
const sourceRl = readline.createInterface({
input: sourceStream,
crlfDelay: Infinity
});
let writeStream = fs.createWriteStream("missing.csv")
let cntr = 0;
await new Promise(sRes => {
sourceRl.on("line", async line => {
//these are conditions left out for simplicity.
//I need to format every line and check to make sure it is
//something I want to search for.
const lineFormatted = formatLine(line);
const skipLine = checkLine(lineFormatted);
if (skipLine) return;
const exists = await checkLineExists(lineFormatted);
if (!exists) console.log("doesn't exists");
if (!exists) writeStream.write( line + "\n" );
if (++cntr % 50 == 0) {
console.log("another 50 done");
console.log(cntr + " so far");
}
});
sourceRl.on("close", sRes);
});
})();
如果您真的想最大限度地减少内存使用量 and/or 以良好的性能支持任意大的文件,那么您确实应该将其中一个文件加载到某种可以对数据进行索引查找的数据库中。然后,您可以逐行循环浏览一个文件,规范化该行,然后 运行 查询以查看它是否存在于其他数据中。
除此之外,还有一些没有数据库的方法。第一个,将一组数据加载到内存中的 Set 对象中,然后逐行循环检查第二个数据集以查看 Set 中的内容(本质上是内存中数据库查找)。
const fs = require('fs');
const readline = require('readline');
function normalizeLine(line) {
return line.toLowerCase();
}
async function compare(sourceFile, targetFile) {
// read in all source lines into a normalized Set
const source = readline.createInterface({ input: fs.createReadStream(sourceFile) });
const sourceLines = new Set();
for await (const line of source) {
sourceLines.add(normalizeLine(line));
}
const notFounds = [];
const target = readline.createInterface({ input: fs.createReadStream(targetFile) });
for await (const line of target) {
if (!sourceLines.has(normalizeLine(line))) {
notFounds.push(line);
}
}
return notFounds;
}
compare("source.txt", "target.txt").then(result => {
if (!result.length) {
console.log("All target lines found in source");
} else {
console.log("Not found in source", result);
}
}).catch(err => {
console.log(err);
});
第二个,使用您的方法逐行循环第一个文件,然后对第一个文件的每一行逐行循环第二个文件。对于任何大型数据集,这都将非常慢,但对于大型文件,它将无限扩展。
这使用 readline 的 promise 接口逐行循环,它 awaits
关闭事件以避免打开文件的任何堆积。
const fs = require('fs');
const readline = require('readline');
function normalizeLine(line) {
return line.toLowerCase();
}
async function compare(sourceFile, targetFile) {
// read in all source lines into a normalized Set
const source = readline.createInterface({ input: fs.createReadStream(targetFile) });
const notFounds = [];
for await (const line of source) {
let found = await findLineInFile(sourceFile, line);
if (!found) {
notFounds.push(line);
}
}
return notFounds;
}
compare("source.txt", "target.txt").then(result => {
if (!result.length) {
console.log("All target lines found in source");
} else {
console.log("Not found in source", result);
}
}).catch(err => {
console.log(err);
});
function streamDestroy(stream) {
return new Promise((resolve, reject) => {
stream.once('close', resolve);
stream.once('error', reject);
stream.destroy();
});
}
async function findLineInFile(filename, targetLine) {
const lookLine = normalizeLine(targetLine);
const stream = fs.createReadStream(filename);
const source = readline.createInterface({ input: stream });
for await (const line of source) {
if (normalizeLine(line) === lookLine) {
await streamDestroy(stream);
return true;
}
}
return false;
}
在我的代码中,我在 readline.on("line"...
事件中调用了一个嵌套的 fs.createReadStream(...)
。最初我让程序彻底失败,因为 fs 将创建超过 1000 个读取流,并且将触发故障安全停止。
我在打开读取流之前进行了检查,以查看打开了多少文件,如果打开的文件超过我想要的数量,我将暂停直到它们关闭。但是,由于代码是异步调用的,因此在其他任务开始停止之前,似乎有超过 600 个检查通过并打开了文件。我在想出一种限制读取流数量的方法时遇到问题,我们将不胜感激任何解决方案。
整个脚本的目的是逐行比较两个非常大的文件。每个文件都有超过 20 万行。
我排除了函数 formatLine
和 checkLine
并重构了我的代码以尝试保持内容与问题相关。
var fs = require( 'fs' );
const readline = require('readline');
const files = {open: 0};
const wait = ms => new Promise(r => setTimeout(r, ms));
//for debugging purposes, just looking at how many files are open
setInterval(() => console.log(files.open), 500);
async function stallIfNeeded() {
while (files.open > 50) {
await wait(500);
}
}
async function checkLineExists(line) {
await stallIfNeeded();
let readStream = fs.createReadStream( "./largeFileOfUncertianValues.csv" );
files.open = files.open + 1;
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity
});
const hasLine = await new Promise(async res => {
let found = false;
rl.on( 'close', () => {
files.open = files.open - 1;
res(found);
});
rl.on("line", oline => {
const hasLine = oline.includes(line);
if (hasLine) {
found = true;
rl.close();
}
});
});
return hasLine;
}
(async () => {
const sourceStream = fs.createReadStream('largeFileOfKnownValues.csv');
const sourceRl = readline.createInterface({
input: sourceStream,
crlfDelay: Infinity
});
let writeStream = fs.createWriteStream("missing.csv")
let cntr = 0;
await new Promise(sRes => {
sourceRl.on("line", async line => {
//these are conditions left out for simplicity.
//I need to format every line and check to make sure it is
//something I want to search for.
const lineFormatted = formatLine(line);
const skipLine = checkLine(lineFormatted);
if (skipLine) return;
const exists = await checkLineExists(lineFormatted);
if (!exists) console.log("doesn't exists");
if (!exists) writeStream.write( line + "\n" );
if (++cntr % 50 == 0) {
console.log("another 50 done");
console.log(cntr + " so far");
}
});
sourceRl.on("close", sRes);
});
})();
如果您真的想最大限度地减少内存使用量 and/or 以良好的性能支持任意大的文件,那么您确实应该将其中一个文件加载到某种可以对数据进行索引查找的数据库中。然后,您可以逐行循环浏览一个文件,规范化该行,然后 运行 查询以查看它是否存在于其他数据中。
除此之外,还有一些没有数据库的方法。第一个,将一组数据加载到内存中的 Set 对象中,然后逐行循环检查第二个数据集以查看 Set 中的内容(本质上是内存中数据库查找)。
const fs = require('fs');
const readline = require('readline');
function normalizeLine(line) {
return line.toLowerCase();
}
async function compare(sourceFile, targetFile) {
// read in all source lines into a normalized Set
const source = readline.createInterface({ input: fs.createReadStream(sourceFile) });
const sourceLines = new Set();
for await (const line of source) {
sourceLines.add(normalizeLine(line));
}
const notFounds = [];
const target = readline.createInterface({ input: fs.createReadStream(targetFile) });
for await (const line of target) {
if (!sourceLines.has(normalizeLine(line))) {
notFounds.push(line);
}
}
return notFounds;
}
compare("source.txt", "target.txt").then(result => {
if (!result.length) {
console.log("All target lines found in source");
} else {
console.log("Not found in source", result);
}
}).catch(err => {
console.log(err);
});
第二个,使用您的方法逐行循环第一个文件,然后对第一个文件的每一行逐行循环第二个文件。对于任何大型数据集,这都将非常慢,但对于大型文件,它将无限扩展。
这使用 readline 的 promise 接口逐行循环,它 awaits
关闭事件以避免打开文件的任何堆积。
const fs = require('fs');
const readline = require('readline');
function normalizeLine(line) {
return line.toLowerCase();
}
async function compare(sourceFile, targetFile) {
// read in all source lines into a normalized Set
const source = readline.createInterface({ input: fs.createReadStream(targetFile) });
const notFounds = [];
for await (const line of source) {
let found = await findLineInFile(sourceFile, line);
if (!found) {
notFounds.push(line);
}
}
return notFounds;
}
compare("source.txt", "target.txt").then(result => {
if (!result.length) {
console.log("All target lines found in source");
} else {
console.log("Not found in source", result);
}
}).catch(err => {
console.log(err);
});
function streamDestroy(stream) {
return new Promise((resolve, reject) => {
stream.once('close', resolve);
stream.once('error', reject);
stream.destroy();
});
}
async function findLineInFile(filename, targetLine) {
const lookLine = normalizeLine(targetLine);
const stream = fs.createReadStream(filename);
const source = readline.createInterface({ input: stream });
for await (const line of source) {
if (normalizeLine(line) === lookLine) {
await streamDestroy(stream);
return true;
}
}
return false;
}