只要有足够的 RAM 可用,大量的回调是否会中断脚本或继续?
Will huge amount of callbacks break script or continue whenever enough RAM is available?
我有一个函数可以从数据库中获取线程(gmail 对话)ID,然后向 Google API 询问每个线程 ID 的所有数据。一旦它接收到一个线程对象,它就将它存储到数据库中。这适用于我的收件箱,其中包含 ~1k 条消息。但我不确定它是否适用于消息超过 10 万条的帐户。
现在我要问的是,一旦机器内存不足,它会崩溃还是会在再次有足够的 RAM 可用时继续执行回调函数?我是否应该修改此代码以逐部分执行此操作(在某些时候重新运行整个脚本并从上次结束的地方继续使用新的 RAM?)
function eachThread(auth) {
var gmail = google.gmail('v1');
MongoClient.connect(mongoUrl, function(err, db){
assert.equal(null, err);
var collection = db.collection('threads');
// Find all data in collection and convert it to array
collection.find().toArray(function(err, docs){
assert.equal(null, err);
var threadContents = [];
// For each doc in array...
for (var i = 0; i < docs.length; i++) {
gmail
.users
.threads
.get( {auth:auth,'userId':'me', 'id':docs[i].id}, function(err, resp){
assert.equal(null, err);
threadContents.push(resp);
console.log(threadContents.length);
console.log(threadContents[threadContents.length - 1].id);
var anotherCollection = db.collection('threadContents');
anotherCollection.updateOne(
{id: threadContents[threadContents.length - 1].id},
threadContents[threadContents.length - 1],
{upsert:true},
function(err, result){
assert.equal(null, err);
console.log('updated one.');
});
if (threadContents.length === docs.length) {
console.log('Length matches!');
db.close();
}
});//end(callback(threads.get))
}//end(for(docs.length))
});//end(find.toArray)
});//end(callback(mongo.connect))
}//end(func(eachThread))
Now what I am asking, once a machine runs out of memory, will it break or will it continue executing callback functions whenever enough RAM is available again?
如果您 运行 内存不足,OS 将终止您的进程。在 Linux 中,您将看到 OOM(内存不足)。所以是的,它会坏的。
在这些情况下,您可以考虑使用流或生成器,以便在内存中只保留需要处理的数据块。
在您的情况下 MongoDB
在 find
方法 https://mongodb.github.io/node-mongodb-native/2.0/tutorials/streams/
上提供流
像这样的东西应该可以工作:
var collection = db.collection('threads');
var cursor = collection.find()
cursor.on('data', function(doc) {
gmail
.users
.threads
.get( {auth:auth,'userId':'me', 'id': doc.id}, function(err, resp) {
...
})
})
如果您不获取所有内容并将其推送到数组,您就不会 运行 内存不足。此外,我不会在循环内的每个元素上实例化相同的对象。
这里是不会 运行 内存不足的代码示例,但是它是即发即弃的意思是当它完成时你不会得到回调等。如果你想这样做你会需要使用 promises/async.
// Fire-and-forget type of function
// Will not run out of memory, GC will take care of that
function eachThread(auth, cb) {
var gmail = google.gmail('v1');
MongoClient.connect(mongoUrl, (err, db) => {
if (err) {
return cb(err);
}
var threadsCollection = db.collection('threads').find();
var contentsCollection = db.collection('threadContents');
threadsCollection.on('data', (doc) => {
gmail.users.threads.get({ auth: auth, 'userId': 'me', 'id': doc.id }, (err, res) => {
if (err) {
return cb(err);
}
contentsCollection.updateOne({ id: doc.id }, res, { upsert: true }, (err, result) => {
if (err) {
return cb(err);
}
});
});
});
threadsCollection.on('end', () => { db.close() });
});
}
将 for
循环替换为 async.mapLimit
足以逐个添加功能。我还冒昧地将 anotherCollection
创建与 collection
一起移动,因为打开连接一次比打开它数百次甚至数千次要好。
我还用 callback(err)
替换了你的 assert.equal
。 async
的功能将理解它应该停止一切,它允许您干净地退出而不是抛出异常。
编辑:
正如@chernando 所说,使用 collection.find().toArray
会将整个集合导入 RAM。逐个部分进行处理的更好方法是流式传输数据,或者要求数据库按块提供数据。
此版本假设您有足够的内存来 collection.find().toArray
正常工作。
我可能会在稍后有空的时候带着我在评论中谈到的工具的改编回来。
var async = require('async');
function eachThread(auth) {
var gmail = google.gmail('v1'),
limit = 100; //Size of the parts
MongoClient.connect(mongoUrl, function(err, db){
assert.equal(null, err);
var collection = db.collection('threads'),
anotherCollection = db.collection('threadContents');
// Find all data in collection and convert it to array
collection.find().toArray(function(err, docs){
assert.equal(null, err);
var threadContents = [];
//Change here
async.mapLimit(docs, limit, (doc, callback) => {
gmail
.users
.threads
.get( {auth:auth,'userId':'me', 'id':docs[i].id}, function(err, resp){
if(err) {
return callback(err);
}
threadContents.push(resp);
console.log(threadContents.length);
console.log(threadContents[threadContents.length - 1].id);
anotherCollection.updateOne(
{id: threadContents[threadContents.length - 1].id},
threadContents[threadContents.length - 1],
{upsert:true},
function(err, result){
if(err) {
console.error(err);
} else {
console.log('updated one.');
}
callback(err);
});
});//end(callback(threads.get))
//Change here
}, (error) => {
if(error) {
console.error('Transfert stopped because of error:' + err);
} else {
console.log('Transfert successful');
}
});//end(async.mapLimit)
});//end(find.toArray)
});//end(callback(mongo.connect))
}//end(func(eachThread))
我有一个函数可以从数据库中获取线程(gmail 对话)ID,然后向 Google API 询问每个线程 ID 的所有数据。一旦它接收到一个线程对象,它就将它存储到数据库中。这适用于我的收件箱,其中包含 ~1k 条消息。但我不确定它是否适用于消息超过 10 万条的帐户。
现在我要问的是,一旦机器内存不足,它会崩溃还是会在再次有足够的 RAM 可用时继续执行回调函数?我是否应该修改此代码以逐部分执行此操作(在某些时候重新运行整个脚本并从上次结束的地方继续使用新的 RAM?)
function eachThread(auth) {
var gmail = google.gmail('v1');
MongoClient.connect(mongoUrl, function(err, db){
assert.equal(null, err);
var collection = db.collection('threads');
// Find all data in collection and convert it to array
collection.find().toArray(function(err, docs){
assert.equal(null, err);
var threadContents = [];
// For each doc in array...
for (var i = 0; i < docs.length; i++) {
gmail
.users
.threads
.get( {auth:auth,'userId':'me', 'id':docs[i].id}, function(err, resp){
assert.equal(null, err);
threadContents.push(resp);
console.log(threadContents.length);
console.log(threadContents[threadContents.length - 1].id);
var anotherCollection = db.collection('threadContents');
anotherCollection.updateOne(
{id: threadContents[threadContents.length - 1].id},
threadContents[threadContents.length - 1],
{upsert:true},
function(err, result){
assert.equal(null, err);
console.log('updated one.');
});
if (threadContents.length === docs.length) {
console.log('Length matches!');
db.close();
}
});//end(callback(threads.get))
}//end(for(docs.length))
});//end(find.toArray)
});//end(callback(mongo.connect))
}//end(func(eachThread))
Now what I am asking, once a machine runs out of memory, will it break or will it continue executing callback functions whenever enough RAM is available again?
如果您 运行 内存不足,OS 将终止您的进程。在 Linux 中,您将看到 OOM(内存不足)。所以是的,它会坏的。
在这些情况下,您可以考虑使用流或生成器,以便在内存中只保留需要处理的数据块。
在您的情况下 MongoDB
在 find
方法 https://mongodb.github.io/node-mongodb-native/2.0/tutorials/streams/
像这样的东西应该可以工作:
var collection = db.collection('threads');
var cursor = collection.find()
cursor.on('data', function(doc) {
gmail
.users
.threads
.get( {auth:auth,'userId':'me', 'id': doc.id}, function(err, resp) {
...
})
})
如果您不获取所有内容并将其推送到数组,您就不会 运行 内存不足。此外,我不会在循环内的每个元素上实例化相同的对象。
这里是不会 运行 内存不足的代码示例,但是它是即发即弃的意思是当它完成时你不会得到回调等。如果你想这样做你会需要使用 promises/async.
// Fire-and-forget type of function
// Will not run out of memory, GC will take care of that
function eachThread(auth, cb) {
var gmail = google.gmail('v1');
MongoClient.connect(mongoUrl, (err, db) => {
if (err) {
return cb(err);
}
var threadsCollection = db.collection('threads').find();
var contentsCollection = db.collection('threadContents');
threadsCollection.on('data', (doc) => {
gmail.users.threads.get({ auth: auth, 'userId': 'me', 'id': doc.id }, (err, res) => {
if (err) {
return cb(err);
}
contentsCollection.updateOne({ id: doc.id }, res, { upsert: true }, (err, result) => {
if (err) {
return cb(err);
}
});
});
});
threadsCollection.on('end', () => { db.close() });
});
}
将 for
循环替换为 async.mapLimit
足以逐个添加功能。我还冒昧地将 anotherCollection
创建与 collection
一起移动,因为打开连接一次比打开它数百次甚至数千次要好。
我还用 callback(err)
替换了你的 assert.equal
。 async
的功能将理解它应该停止一切,它允许您干净地退出而不是抛出异常。
编辑:
正如@chernando 所说,使用 collection.find().toArray
会将整个集合导入 RAM。逐个部分进行处理的更好方法是流式传输数据,或者要求数据库按块提供数据。
此版本假设您有足够的内存来 collection.find().toArray
正常工作。
我可能会在稍后有空的时候带着我在评论中谈到的工具的改编回来。
var async = require('async');
function eachThread(auth) {
var gmail = google.gmail('v1'),
limit = 100; //Size of the parts
MongoClient.connect(mongoUrl, function(err, db){
assert.equal(null, err);
var collection = db.collection('threads'),
anotherCollection = db.collection('threadContents');
// Find all data in collection and convert it to array
collection.find().toArray(function(err, docs){
assert.equal(null, err);
var threadContents = [];
//Change here
async.mapLimit(docs, limit, (doc, callback) => {
gmail
.users
.threads
.get( {auth:auth,'userId':'me', 'id':docs[i].id}, function(err, resp){
if(err) {
return callback(err);
}
threadContents.push(resp);
console.log(threadContents.length);
console.log(threadContents[threadContents.length - 1].id);
anotherCollection.updateOne(
{id: threadContents[threadContents.length - 1].id},
threadContents[threadContents.length - 1],
{upsert:true},
function(err, result){
if(err) {
console.error(err);
} else {
console.log('updated one.');
}
callback(err);
});
});//end(callback(threads.get))
//Change here
}, (error) => {
if(error) {
console.error('Transfert stopped because of error:' + err);
} else {
console.log('Transfert successful');
}
});//end(async.mapLimit)
});//end(find.toArray)
});//end(callback(mongo.connect))
}//end(func(eachThread))