Node.js 可读流随着时间的推移变慢,CPU 使用率下降
Node.js Readable stream slows over time, CPU usage falls
我正在尝试启动一个集群,该集群将从 google 云存储流式传输文件(新行分隔 JSON),并在从 MongoDB 获取数据后转换每一行。转换行后,我想将其存储在 Google 的 bigquery - 一次 10000 行。所有这一切都工作正常,但问题是流式文件的处理速度随着时间的推移显着下降。
我在一台服务器上设置了节点应用程序,在另一台服务器上设置了 mongodb。两台 8 核机器均配备 30GB 内存。执行脚本时,最初 CPU 应用程序服务器和 mongodb 服务器的使用率约为 70%-75%。 30 分钟后,CPU 使用率下降到 10%,最后下降到 1%。该应用程序不会产生任何异常。我可以查看应用程序日志,发现它处理完了一些文件并占用了新文件进行处理。可以观察到一次执行比 3:00PM 晚一点,几乎可以观察到 5:20PM.
var cluster = require('cluster'),
os = require('os'),
numCPUs = os.cpus().length,
async = require('async'),
fs = require('fs'),
google = require('googleapis'),
bigqueryV2 = google.bigquery('v2'),
gcs = require('@google-cloud/storage')({
projectId: 'someproject',
keyFilename: __dirname + '/auth.json'
}),
dataset = bigquery.dataset('somedataset'),
bucket = gcs.bucket('somebucket.appspot.com'),
JSONStream = require('JSONStream'),
Transform = require('stream').Transform,
MongoClient = require('mongodb').MongoClient,
mongoUrl = 'mongodb://localhost:27017/bigquery',
mDb,
groupA,
groupB;
var rows = [],
rowsLen = 0;
function transformer() {
var t = new Transform({ objectMode: true });
t._transform = function(row, encoding, cb) {
// Get some information from mongodb and attach it to the row
if (row) {
groupA.findOne({
'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } }
}, {
fields: { 'properties.OA_SA': 1, _id: 0 }
}, function(err, a) {
if (err) return cb();
groupB.findOne({
'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } }
}, {
fields: { 'properties.WZ11CD': 1, _id: 0 }
}, function(err, b) {
if (err) return cb();
row.groupA = a ? a.properties.OA_SA : null;
row.groupB = b ? b.properties.WZ11CD : null;
// cache processed rows in memory
rows[rowsLen++] = { json: row };
if (rowsLen >= 10000) {
// batch insert rows in bigquery table
// and free memory
log('inserting 10000')
insertRowsAsStream(rows.splice(0, 10000));
rowsLen = rows.length;
}
cb();
});
});
} else {
cb();
}
};
return t;
}
var log = function(str) {
console.log(str);
}
function insertRowsAsStream(rows, callback) {
bigqueryV2.tabledata.insertAll({
"projectId": 'someproject',
"datasetId": 'somedataset',
"tableId": 'sometable',
"resource": {
"kind": "bigquery#tableDataInsertAllRequest",
"rows": rows
}
}, function(err, res) {
if (res && res.insertErrors && res.insertErrors.length) {
console.log(res.insertErrors[0].errors)
err = err || new Error(JSON.stringify(res.insertErrors));
}
});
}
function startStream(fileName, cb) {
// stream a file from Google cloud storage
var file = bucket.file(fileName),
called = false;
log(`Processing file ${fileName}`);
file.createReadStream()
.on('data', noop)
.on('end', function() {
if (!called) {
called = true;
cb();
}
})
.pipe(JSONStream.parse())
.pipe(transformer())
.on('finish', function() {
log('transformation ended');
if (!called) {
called = true;
cb();
}
});
}
function processFiles(files, cpuIdentifier) {
if (files.length == 0) return;
var fn = [];
for (var i = 0; i < files.length; i++) {
fn.push(function(cb) {
startStream(files.pop(), cb);
});
}
// process 3 files in parallel
async.parallelLimit(fn, 3, function() {
log(`child process ${cpuIdentifier} completed the task`);
fs.appendFile(__dirname + '/complete_count.txt', '1');
});
}
if (cluster.isMaster) {
for (var ii = 0; ii < numCPUs; ii++) {
cluster.fork();
}
} else {
MongoClient.connect(mongoUrl, function(err, db) {
if (err) throw (err);
mDb = db;
groupA = mDb.collection('groupageo');
groupB = mDb.collection('groupbgeo');
processFiles(files, process.pid);
// `files` is an array of file names
// each file is in newline json delimited format
// ["1478854974993/000000000000.json","1478854974993/000000000001.json","1478854974993/000000000002.json","1478854974993/000000000003.json","1478854974993/000000000004.json","1478854974993/000000000005.json"]
});
}
好的,我找到罪魁祸首了! Google API Node.js 客户端库使用名为 "stream-events" 的模块,该模块实现了 Streams 0.8。 Streams 0.8 不会根据消费者使用数据的能力来控制它发出 'data' 事件的速率。速率控制功能是在 Streams 1.0 中引入的。所以这实质上意味着可读流以 MongoDB 的速度抛出数据,其速度无法处理。
解决方法:
我使用 'request' 模块而不是 Google 的客户端库。我向请求模块提供了一个已签名的 URL,该模块又将结果作为流获取,我可以将其通过管道传输到我的转换器中。
带走:
始终检查您使用的模块是否适用于他们正在使用的流版本。
我正在尝试启动一个集群,该集群将从 google 云存储流式传输文件(新行分隔 JSON),并在从 MongoDB 获取数据后转换每一行。转换行后,我想将其存储在 Google 的 bigquery - 一次 10000 行。所有这一切都工作正常,但问题是流式文件的处理速度随着时间的推移显着下降。
我在一台服务器上设置了节点应用程序,在另一台服务器上设置了 mongodb。两台 8 核机器均配备 30GB 内存。执行脚本时,最初 CPU 应用程序服务器和 mongodb 服务器的使用率约为 70%-75%。 30 分钟后,CPU 使用率下降到 10%,最后下降到 1%。该应用程序不会产生任何异常。我可以查看应用程序日志,发现它处理完了一些文件并占用了新文件进行处理。可以观察到一次执行比 3:00PM 晚一点,几乎可以观察到 5:20PM.
var cluster = require('cluster'),
os = require('os'),
numCPUs = os.cpus().length,
async = require('async'),
fs = require('fs'),
google = require('googleapis'),
bigqueryV2 = google.bigquery('v2'),
gcs = require('@google-cloud/storage')({
projectId: 'someproject',
keyFilename: __dirname + '/auth.json'
}),
dataset = bigquery.dataset('somedataset'),
bucket = gcs.bucket('somebucket.appspot.com'),
JSONStream = require('JSONStream'),
Transform = require('stream').Transform,
MongoClient = require('mongodb').MongoClient,
mongoUrl = 'mongodb://localhost:27017/bigquery',
mDb,
groupA,
groupB;
var rows = [],
rowsLen = 0;
function transformer() {
var t = new Transform({ objectMode: true });
t._transform = function(row, encoding, cb) {
// Get some information from mongodb and attach it to the row
if (row) {
groupA.findOne({
'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } }
}, {
fields: { 'properties.OA_SA': 1, _id: 0 }
}, function(err, a) {
if (err) return cb();
groupB.findOne({
'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } }
}, {
fields: { 'properties.WZ11CD': 1, _id: 0 }
}, function(err, b) {
if (err) return cb();
row.groupA = a ? a.properties.OA_SA : null;
row.groupB = b ? b.properties.WZ11CD : null;
// cache processed rows in memory
rows[rowsLen++] = { json: row };
if (rowsLen >= 10000) {
// batch insert rows in bigquery table
// and free memory
log('inserting 10000')
insertRowsAsStream(rows.splice(0, 10000));
rowsLen = rows.length;
}
cb();
});
});
} else {
cb();
}
};
return t;
}
var log = function(str) {
console.log(str);
}
function insertRowsAsStream(rows, callback) {
bigqueryV2.tabledata.insertAll({
"projectId": 'someproject',
"datasetId": 'somedataset',
"tableId": 'sometable',
"resource": {
"kind": "bigquery#tableDataInsertAllRequest",
"rows": rows
}
}, function(err, res) {
if (res && res.insertErrors && res.insertErrors.length) {
console.log(res.insertErrors[0].errors)
err = err || new Error(JSON.stringify(res.insertErrors));
}
});
}
function startStream(fileName, cb) {
// stream a file from Google cloud storage
var file = bucket.file(fileName),
called = false;
log(`Processing file ${fileName}`);
file.createReadStream()
.on('data', noop)
.on('end', function() {
if (!called) {
called = true;
cb();
}
})
.pipe(JSONStream.parse())
.pipe(transformer())
.on('finish', function() {
log('transformation ended');
if (!called) {
called = true;
cb();
}
});
}
function processFiles(files, cpuIdentifier) {
if (files.length == 0) return;
var fn = [];
for (var i = 0; i < files.length; i++) {
fn.push(function(cb) {
startStream(files.pop(), cb);
});
}
// process 3 files in parallel
async.parallelLimit(fn, 3, function() {
log(`child process ${cpuIdentifier} completed the task`);
fs.appendFile(__dirname + '/complete_count.txt', '1');
});
}
if (cluster.isMaster) {
for (var ii = 0; ii < numCPUs; ii++) {
cluster.fork();
}
} else {
MongoClient.connect(mongoUrl, function(err, db) {
if (err) throw (err);
mDb = db;
groupA = mDb.collection('groupageo');
groupB = mDb.collection('groupbgeo');
processFiles(files, process.pid);
// `files` is an array of file names
// each file is in newline json delimited format
// ["1478854974993/000000000000.json","1478854974993/000000000001.json","1478854974993/000000000002.json","1478854974993/000000000003.json","1478854974993/000000000004.json","1478854974993/000000000005.json"]
});
}
好的,我找到罪魁祸首了! Google API Node.js 客户端库使用名为 "stream-events" 的模块,该模块实现了 Streams 0.8。 Streams 0.8 不会根据消费者使用数据的能力来控制它发出 'data' 事件的速率。速率控制功能是在 Streams 1.0 中引入的。所以这实质上意味着可读流以 MongoDB 的速度抛出数据,其速度无法处理。
解决方法: 我使用 'request' 模块而不是 Google 的客户端库。我向请求模块提供了一个已签名的 URL,该模块又将结果作为流获取,我可以将其通过管道传输到我的转换器中。
带走: 始终检查您使用的模块是否适用于他们正在使用的流版本。