NodeJs 流 - 内存不足
NodeJs Stream - Out of Memory
我正在尝试处理 3 亿行数据流。一旦达到大约 500 万行,我就会收到 致命错误:CALL_AND_RETRY_LAST 分配失败 - 进程内存不足 。 (数量因机器而异,但一直在发生)。
您可以 运行 下面的代码来查看这种情况 - 我无法判断代码中的问题是否是流固有的。我试图愚弄这个过程,但我做不到。
有内存限制吗?我删除了所有其他代码和 'dumbed' 示例以确保它不是一些背压问题。
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
var tenMillion = 10000000;
//var tenMillion = 5000000; //THIS WORKS
var writeEvery = tenMillion / 10;
/*
* Create a really simple stream that will run 10 million times
*/
function Streamo(max) {
Readable.call(this, { objectMode: true });
this._currentIndex = -1;
this._maxIndex = max;
}
util.inherits(Streamo, Readable);
Streamo.prototype._read = function () {
this._currentIndex += 1;
if (this._currentIndex % writeEvery == 0) {
console.log(this._currentIndex + ' of ' + this._maxIndex)
};
if (this._currentIndex < 0 || this._currentIndex >= this._maxIndex) {
console.log("BOOM")
this.push(null);
return;
}
this.push(true);
};
/*
* Create a really simple Writable Stream to Count
*/
function Counta() {
Writable.call(this, { objectMode: true, highWaterMark: (200 * 1024) });
this._count = 0;
}
util.inherits(Counta, Writable);
Counta.prototype._write = function (chunk, enc, cb) {
this._count++;
if (this._count % writeEvery == 0) {
console.log('_______________________________' + this._count)
};
cb();
};
Counta.prototype.Count = function () {
return this._count;
}
/*
* Exercise It
*/
var s = new Streamo(tenMillion);
var c = new Counta();
s.pipe(c);
c.on('finish', function () {
console.log("BOOM BOOM BOOM BOOM BOOM BOOM BOOM BOOM BOOM ")
});
这是 known issue 当前流实现。
在流文档和代码中,multiple places 暗示 _read()
应该是异步的。
因此,如果您实际上并未在 _read()
实施中执行某种(异步)i/o,那么您可能需要(至少偶尔)在之前调用 setImmediate()
push()
ing,以防止调用堆栈变得太大。例如,这可以正常工作而不会崩溃:
Streamo.prototype._read = function (n) {
this._currentIndex += 1;
if (this._currentIndex % writeEvery == 0) {
console.log(this._currentIndex + ' of ' + this._maxIndex)
};
if (this._currentIndex < 0 || this._currentIndex >= this._maxIndex) {
console.log("BOOM")
this.push(null);
return;
}
var self = this;
if (this._currentIndex % writeEvery == 0) {
setImmediate(function() {
self.push(true);
});
} else
this.push(true);
};
我正在尝试处理 3 亿行数据流。一旦达到大约 500 万行,我就会收到 致命错误:CALL_AND_RETRY_LAST 分配失败 - 进程内存不足 。 (数量因机器而异,但一直在发生)。
您可以 运行 下面的代码来查看这种情况 - 我无法判断代码中的问题是否是流固有的。我试图愚弄这个过程,但我做不到。
有内存限制吗?我删除了所有其他代码和 'dumbed' 示例以确保它不是一些背压问题。
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
var tenMillion = 10000000;
//var tenMillion = 5000000; //THIS WORKS
var writeEvery = tenMillion / 10;
/*
* Create a really simple stream that will run 10 million times
*/
function Streamo(max) {
Readable.call(this, { objectMode: true });
this._currentIndex = -1;
this._maxIndex = max;
}
util.inherits(Streamo, Readable);
Streamo.prototype._read = function () {
this._currentIndex += 1;
if (this._currentIndex % writeEvery == 0) {
console.log(this._currentIndex + ' of ' + this._maxIndex)
};
if (this._currentIndex < 0 || this._currentIndex >= this._maxIndex) {
console.log("BOOM")
this.push(null);
return;
}
this.push(true);
};
/*
* Create a really simple Writable Stream to Count
*/
function Counta() {
Writable.call(this, { objectMode: true, highWaterMark: (200 * 1024) });
this._count = 0;
}
util.inherits(Counta, Writable);
Counta.prototype._write = function (chunk, enc, cb) {
this._count++;
if (this._count % writeEvery == 0) {
console.log('_______________________________' + this._count)
};
cb();
};
Counta.prototype.Count = function () {
return this._count;
}
/*
* Exercise It
*/
var s = new Streamo(tenMillion);
var c = new Counta();
s.pipe(c);
c.on('finish', function () {
console.log("BOOM BOOM BOOM BOOM BOOM BOOM BOOM BOOM BOOM ")
});
这是 known issue 当前流实现。
在流文档和代码中,multiple places 暗示 _read()
应该是异步的。
因此,如果您实际上并未在 _read()
实施中执行某种(异步)i/o,那么您可能需要(至少偶尔)在之前调用 setImmediate()
push()
ing,以防止调用堆栈变得太大。例如,这可以正常工作而不会崩溃:
Streamo.prototype._read = function (n) {
this._currentIndex += 1;
if (this._currentIndex % writeEvery == 0) {
console.log(this._currentIndex + ' of ' + this._maxIndex)
};
if (this._currentIndex < 0 || this._currentIndex >= this._maxIndex) {
console.log("BOOM")
this.push(null);
return;
}
var self = this;
if (this._currentIndex % writeEvery == 0) {
setImmediate(function() {
self.push(true);
});
} else
this.push(true);
};