NodeJS:使用管道从可读流写入文件会导致堆内存错误
NodeJS: Using Pipe To Write A File From A Readable Stream Gives Heap Memory Error
我正在尝试创建 1.5 亿行数据并将数据写入 csv 文件,以便我可以将数据插入到不同的数据库中,只需稍作修改。
我正在使用一些函数来生成看似随机的数据并将数据推送到可写流中。
我现在的代码无法成功处理内存问题。
经过几个小时的研究,我开始认为我不应该在 for 循环的末尾推送每个数据,因为管道方法似乎无法以这种方式处理垃圾收集。
此外,我发现了一些建议完全不要使用推送的 Whosebug 答案和 NodeJS 文档。
但是,我对 NodeJS 很陌生,我觉得自己被阻塞了,不知道如何从这里开始。
如果有人可以为我提供有关如何进行的任何指导并给我一个例子,我将不胜感激。
下面是我的代码的一部分,可以让您更好地理解我想要实现的目标。
P.S。 -
我找到了一种完全不用管道方法就可以成功处理内存问题的方法——我使用了 drain 事件——但我不得不从头开始,现在我很想知道是否有一个简单的方法在不完全更改这段代码的情况下处理此内存问题的方法。
此外,我一直试图避免使用任何库,因为我觉得应该有一个相对容易的调整来使这项工作在不使用库的情况下工作,但如果我错了请告诉我。提前谢谢你。
// This is my target number of data
const targetDataNum = 150000000;
// Create readable stream
const readableStream = new Stream.Readable({
read() {}
});
// Create writable stream
const writableStream = fs.createWriteStream('./database/RDBMS/test.csv');
// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');
// Then, push a number of data to the readable stream (150M in this case)
for (var i = 1; i <= targetDataNum; i += 1) {
const id = i;
const body = lorem.paragraph(1);
const date = randomDate(new Date(2014, 0, 1), new Date());
const dp = randomNumber(1, 1000);
const data = `${id},${body},${date},${dp}\n`;
readableStream.push(data, 'utf8');
};
// Pipe readable stream to writeable stream
readableStream.pipe(writableStream);
// End the stream
readableStream.push(null);
我建议尝试如下解决方案:
const { Readable } = require('readable-stream');
class CustomReadable extends Readable {
constructor(max, options = {}) {
super(options);
this.targetDataNum = max;
this.i = 1;
}
_read(size) {
if (i <= this.targetDataNum) {
// your code to build the csv content
this.push(data, 'utf8');
return;
}
this.push(null);
}
}
const rs = new CustomReadable(150000000);
rs.pipe(ws);
只需用您的代码部分完成它即可填充 csv 并创建可写流。
使用此解决方案,您可以将 rs.push
方法调用保留为调用内部 _read
流方法,直到 this.push(null)
未被调用。可能是在您填充内部流缓冲区太快之前,在循环中手动调用 push
导致出现内存错误。
由于您是流的新手,也许可以从更简单的抽象开始:generators。生成器仅在数据被使用时生成数据(就像 Streams 应该的那样),但它们没有缓冲和复杂的构造函数和方法。
这只是您的 for
循环,已移至生成器函数中:
function * generateData(targetDataNum) {
for (var i = 1; i <= targetDataNum; i += 1) {
const id = i;
const body = lorem.paragraph(1);
const date = randomDate(new Date(2014, 0, 1), new Date());
const dp = randomNumber(1, 1000);
yield `${id},${body},${date},${dp}\n`;
}
}
在 Node 12 中,您可以创建一个 Readable
流 directly from any iterable,包括生成器和异步生成器:
const stream = Readable.from(generateData(), {encoding: 'utf8'})
stream.pipe(writableStream)
在开始将数据泵入 ReadableStream
和 yield
之前尝试 pipe
ing 到 WritableStream
,然后再写入下一个 chunk
。
...
// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');
// Pipe readable stream to writeable stream
readableStream.pipe(writableStream);
// Then, push a number of data to the readable stream (150M in this case)
for (var i = 1; i <= targetDataNum; i += 1) {
const id = i;
const body = lorem.paragraph(1);
const date = randomDate(new Date(2014, 0, 1), new Date());
const dp = randomNumber(1, 1000);
const data = `${id},${body},${date},${dp}\n`;
readableStream.push(data, 'utf8');
// somehow YIELD for the STREAM to drain out.
};
...
Node.js 的整个 Stream
实现依赖于 wire
速度慢并且 CPU 实际上可以在下一个 chunk
数据来自 stream
source
或直到下一个 chunk
数据已 written
到 stream
destination
。
在当前的实施中,由于 for-loop
已经预订了 CPU,因此 pipe
将数据实际发送到 writestream
没有停机时间.如果你 watch
cat test.csv
你将能够抓住这个,当循环是 运行.
时它不会改变
正如(我确定)您知道的那样,pipe
有助于 gua运行 确保您正在使用的数据仅在 chunks
中缓冲在内存中,而不是作为所有的。但是 gua运行tee 只有在 CPU 有足够的停机时间来实际耗尽数据时才成立。
说了这么多,我把你的整个代码包装成一个 async
IIFE
和 运行 它用一个 await
一个 setTimeout
确保我 yield
为 stream
到 drain
的数据。
let fs = require('fs');
let Stream = require('stream');
(async function () {
// This is my target number of data
const targetDataNum = 150000000;
// Create readable stream
const readableStream = new Stream.Readable({
read() { }
});
// Create writable stream
const writableStream = fs.createWriteStream('./test.csv');
// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');
// Pipe readable stream to writeable stream
readableStream.pipe(writableStream);
// Then, push a number of data to the readable stream (150M in this case)
for (var i = 1; i <= targetDataNum; i += 1) {
console.log(`Pushing ${i}`);
const id = i;
const body = `body${i}`;
const date = `date${i}`;
const dp = `dp${i}`;
const data = `${id},${body},${date},${dp}\n`;
readableStream.push(data, 'utf8');
await new Promise(resolve => setImmediate(resolve));
};
// End the stream
readableStream.push(null);
})();
这就是 top
一直以来的样子我 运行 这个。
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
15213 binaek ** ** ****** ***** ***** * ***.* 0.5 *:**.** node
注意 %MEM
或多或少保持静态。
您 运行 内存不足,因为您在将任何数据写入磁盘之前在内存中预先生成了所有数据。相反,您需要一种在生成时编写的策略,这样您就不必在内存中保存大量数据。
这里似乎不需要 .pipe()
,因为您控制数据的生成(它不是来自某个随机读取流)。
因此,您可以只生成数据并立即写入,并在需要时处理 drain 事件。这是一个可运行的示例(这会创建一个非常大的文件):
const {once} = require('events');
const fs = require('fs');
// This is my target number of data
const targetDataNum = 150000000;
async function run() {
// Create writable stream
const writableStream = fs.createWriteStream('./test.csv');
// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');
// Then, push a number of data to the readable stream (150M in this case)
for (let i = 1; i <= targetDataNum; i += 1) {
const id = i;
const body = lorem.paragraph(1);
const date = randomDate(new Date(2014, 0, 1), new Date());
const dp = randomNumber(1, 1000);
const data = `${id},${body},${date},${dp}\n`;
const canWriteMore = writableStream.write(data);
if (!canWriteMore) {
// wait for stream to be ready for more writing
await once(writableStream, "drain");
}
}
writableStream.end();
}
run().then(() => {
console.log(done);
}).catch(err => {
console.log("got rejection: ", err);
});
// placeholders for the functions that were being used
function randomDate(low, high) {
let rand = randomNumber(low.getTime(), high.getTime());
return new Date(rand);
}
function randomNumber(low, high) {
return Math.floor(Math.random() * (high - low)) + low;
}
const lorem = {
paragraph: function() {
return "random paragraph";
}
}
我正在尝试创建 1.5 亿行数据并将数据写入 csv 文件,以便我可以将数据插入到不同的数据库中,只需稍作修改。
我正在使用一些函数来生成看似随机的数据并将数据推送到可写流中。
我现在的代码无法成功处理内存问题。
经过几个小时的研究,我开始认为我不应该在 for 循环的末尾推送每个数据,因为管道方法似乎无法以这种方式处理垃圾收集。
此外,我发现了一些建议完全不要使用推送的 Whosebug 答案和 NodeJS 文档。
但是,我对 NodeJS 很陌生,我觉得自己被阻塞了,不知道如何从这里开始。
如果有人可以为我提供有关如何进行的任何指导并给我一个例子,我将不胜感激。
下面是我的代码的一部分,可以让您更好地理解我想要实现的目标。
P.S。 -
我找到了一种完全不用管道方法就可以成功处理内存问题的方法——我使用了 drain 事件——但我不得不从头开始,现在我很想知道是否有一个简单的方法在不完全更改这段代码的情况下处理此内存问题的方法。
此外,我一直试图避免使用任何库,因为我觉得应该有一个相对容易的调整来使这项工作在不使用库的情况下工作,但如果我错了请告诉我。提前谢谢你。
// This is my target number of data
const targetDataNum = 150000000;
// Create readable stream
const readableStream = new Stream.Readable({
read() {}
});
// Create writable stream
const writableStream = fs.createWriteStream('./database/RDBMS/test.csv');
// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');
// Then, push a number of data to the readable stream (150M in this case)
for (var i = 1; i <= targetDataNum; i += 1) {
const id = i;
const body = lorem.paragraph(1);
const date = randomDate(new Date(2014, 0, 1), new Date());
const dp = randomNumber(1, 1000);
const data = `${id},${body},${date},${dp}\n`;
readableStream.push(data, 'utf8');
};
// Pipe readable stream to writeable stream
readableStream.pipe(writableStream);
// End the stream
readableStream.push(null);
我建议尝试如下解决方案:
const { Readable } = require('readable-stream');
class CustomReadable extends Readable {
constructor(max, options = {}) {
super(options);
this.targetDataNum = max;
this.i = 1;
}
_read(size) {
if (i <= this.targetDataNum) {
// your code to build the csv content
this.push(data, 'utf8');
return;
}
this.push(null);
}
}
const rs = new CustomReadable(150000000);
rs.pipe(ws);
只需用您的代码部分完成它即可填充 csv 并创建可写流。
使用此解决方案,您可以将 rs.push
方法调用保留为调用内部 _read
流方法,直到 this.push(null)
未被调用。可能是在您填充内部流缓冲区太快之前,在循环中手动调用 push
导致出现内存错误。
由于您是流的新手,也许可以从更简单的抽象开始:generators。生成器仅在数据被使用时生成数据(就像 Streams 应该的那样),但它们没有缓冲和复杂的构造函数和方法。
这只是您的 for
循环,已移至生成器函数中:
function * generateData(targetDataNum) {
for (var i = 1; i <= targetDataNum; i += 1) {
const id = i;
const body = lorem.paragraph(1);
const date = randomDate(new Date(2014, 0, 1), new Date());
const dp = randomNumber(1, 1000);
yield `${id},${body},${date},${dp}\n`;
}
}
在 Node 12 中,您可以创建一个 Readable
流 directly from any iterable,包括生成器和异步生成器:
const stream = Readable.from(generateData(), {encoding: 'utf8'})
stream.pipe(writableStream)
在开始将数据泵入 ReadableStream
和 yield
之前尝试 pipe
ing 到 WritableStream
,然后再写入下一个 chunk
。
...
// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');
// Pipe readable stream to writeable stream
readableStream.pipe(writableStream);
// Then, push a number of data to the readable stream (150M in this case)
for (var i = 1; i <= targetDataNum; i += 1) {
const id = i;
const body = lorem.paragraph(1);
const date = randomDate(new Date(2014, 0, 1), new Date());
const dp = randomNumber(1, 1000);
const data = `${id},${body},${date},${dp}\n`;
readableStream.push(data, 'utf8');
// somehow YIELD for the STREAM to drain out.
};
...
Node.js 的整个 Stream
实现依赖于 wire
速度慢并且 CPU 实际上可以在下一个 chunk
数据来自 stream
source
或直到下一个 chunk
数据已 written
到 stream
destination
。
在当前的实施中,由于 for-loop
已经预订了 CPU,因此 pipe
将数据实际发送到 writestream
没有停机时间.如果你 watch
cat test.csv
你将能够抓住这个,当循环是 运行.
正如(我确定)您知道的那样,pipe
有助于 gua运行 确保您正在使用的数据仅在 chunks
中缓冲在内存中,而不是作为所有的。但是 gua运行tee 只有在 CPU 有足够的停机时间来实际耗尽数据时才成立。
说了这么多,我把你的整个代码包装成一个 async
IIFE
和 运行 它用一个 await
一个 setTimeout
确保我 yield
为 stream
到 drain
的数据。
let fs = require('fs');
let Stream = require('stream');
(async function () {
// This is my target number of data
const targetDataNum = 150000000;
// Create readable stream
const readableStream = new Stream.Readable({
read() { }
});
// Create writable stream
const writableStream = fs.createWriteStream('./test.csv');
// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');
// Pipe readable stream to writeable stream
readableStream.pipe(writableStream);
// Then, push a number of data to the readable stream (150M in this case)
for (var i = 1; i <= targetDataNum; i += 1) {
console.log(`Pushing ${i}`);
const id = i;
const body = `body${i}`;
const date = `date${i}`;
const dp = `dp${i}`;
const data = `${id},${body},${date},${dp}\n`;
readableStream.push(data, 'utf8');
await new Promise(resolve => setImmediate(resolve));
};
// End the stream
readableStream.push(null);
})();
这就是 top
一直以来的样子我 运行 这个。
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
15213 binaek ** ** ****** ***** ***** * ***.* 0.5 *:**.** node
注意 %MEM
或多或少保持静态。
您 运行 内存不足,因为您在将任何数据写入磁盘之前在内存中预先生成了所有数据。相反,您需要一种在生成时编写的策略,这样您就不必在内存中保存大量数据。
这里似乎不需要 .pipe()
,因为您控制数据的生成(它不是来自某个随机读取流)。
因此,您可以只生成数据并立即写入,并在需要时处理 drain 事件。这是一个可运行的示例(这会创建一个非常大的文件):
const {once} = require('events');
const fs = require('fs');
// This is my target number of data
const targetDataNum = 150000000;
async function run() {
// Create writable stream
const writableStream = fs.createWriteStream('./test.csv');
// Write columns first
writableStream.write('id, body, date, dp\n', 'utf8');
// Then, push a number of data to the readable stream (150M in this case)
for (let i = 1; i <= targetDataNum; i += 1) {
const id = i;
const body = lorem.paragraph(1);
const date = randomDate(new Date(2014, 0, 1), new Date());
const dp = randomNumber(1, 1000);
const data = `${id},${body},${date},${dp}\n`;
const canWriteMore = writableStream.write(data);
if (!canWriteMore) {
// wait for stream to be ready for more writing
await once(writableStream, "drain");
}
}
writableStream.end();
}
run().then(() => {
console.log(done);
}).catch(err => {
console.log("got rejection: ", err);
});
// placeholders for the functions that were being used
function randomDate(low, high) {
let rand = randomNumber(low.getTime(), high.getTime());
return new Date(rand);
}
function randomNumber(low, high) {
return Math.floor(Math.random() * (high - low)) + low;
}
const lorem = {
paragraph: function() {
return "random paragraph";
}
}