使用 nodejs/async 批量和延迟 API 次调用
Batch and delay API calls with nodejs/async
我正在研究社交网络图,我想根据从 API.
获得的邻接列表构建一个 "six degrees of separation" 树
对于每个人,API 将 return 一组形式为 [id1, id2, id3...] 的朋友,这正是我想要的。但美中不足的是人数众多,API只允许400calls/15分钟。我可以将数据保存在本地数据库中,但我不想用请求淹没 API。
我正在做的伪代码是这样的:
requestCharacter = function(id) {
is this person in my db already? if true, return;
else make api call(error, function(){loopFriends(character)}) {
save character in database
}
}
loopFriends(character){
foreach(friend in character.friends) requestCharacter(friend);
}
我已经或多或少地编写了代码,它工作正常,但是由于它不断遍历树,并且由于人们在彼此的朋友列表中重复出现,所以效率非常低,并且不断破坏 API 限制
所以我想做的是将请求排队,在我添加之前检查队列中是否有东西,然后以每次 400 个或更少请求的批次运行队列。 (因此,如果队列中有 1200 个,它将运行 400 个,等待 15 分钟,运行 400,等待 15 分钟,运行 400...)
我尝试将 async.js 与它的队列一起使用,并且我能够将大量内容加载到队列中,但我认为它从未真正运行过。对于这种情况,最好的方法是什么?
我实际的非队列代码如下:
var lookupAndInsertCharacter = function(id){
Character.findOne({ 'id': id }, function (err, person) {
if (err) console.log(err);
else {
if(person!=null) {console.log('%s already exists in database, not saved', person.name); getCharacterFriends(id);}
else insertCharacter(id, function(){getCharacterFriends(id)});
};
})
}
var insertCharacter = function(id, callback){
var url = getCharacterURL(id);
request(url, function (error, response, body) {
if (!error && response.statusCode == 200) {
var result = JSON.parse(body);
if(result.status_code != 1 ) {console.log("ERROR status_code: %s. Please wait 15 minutes", result.status_code); return;}
else {
var me = new Character(processCharacter(result));
me.save(function(err){
if (err) return handleError(err);
});
console.log("Saved character "+me.name);
}
}
else {
console.log(error);
}
});
}
var getCharacterFriends = function(id) {
Character.findOne({ 'id': id }, function (err, person) {
if (err) console.log(err);
else {
console.log("Getting friends for %s",person.name);
_.each(person.character_friends, function(d){
lookupAndInsertCharacter(d);
});
console.log("Getting enemies for %s",person.name);
_.each(person.character_enemies, function(d){
lookupAndInsertCharacter(d);
})
};
})
}
在下面的示例中,我得到了我在 FaceBook 上的所有群组、上面的帖子和作者的 public 个人资料。
为了减慢这个过程,我创建了一个有限的 'scrapers' 池并将每个抓取器保留一段时间,所以我 "cannot overload the FaceBook server :)"
对于上面的例子,你可以
- 将您的池大小限制为 400
max : 400
并保留您的爬虫 15 分钟setTimeout(function(){pool.release(scraper);}, 15*60*1000);
- 或将您的池大小限制为 1
max : 1
并保留您的爬虫 3.75 秒 setTimeout(function(){pool.release(scraper);}, 3750);
代码来了
function saveData (anyJson) {
// put your Db communication here.
// console.log(anyJson);
}
function now() {
instant = new Date();
return instant.getHours() +':'+ instant.getMinutes() +':'+ instant.getSeconds() +'.'+ instant.getMilliseconds();
}
var graph = require('fbgraph');
console.log(process.argv[2]);
graph.setAccessToken(process.argv[2]);
var poolModule = require('generic-pool');
var pool = poolModule.Pool({
name : 'scraper',
create : function(callback) {
console.log(now() +' created scraper');
// parameter order: err, resource
callback(null, {created:now()});
},
destroy : function(scraper) {
console.log(now() +' released scraper created '+ scraper.created);
},
max : 10,
min : 1,
idleTimeoutMillis : 60*60*1000,
log : false
});
function pooledGraphGet(path,analyse) {
pool.acquire(function(err,scraper) {
if (err) {
console.log(now() +' Could not get a scraper for '+ path);
throw err;
}
graph.get(path,function(err,res) {
if (err) {
console.log(now() +' Could not get '+ path +' using scraper created '+ scraper.created);
throw err;
} else {
console.log(now() +' Got '+ path +' using scraper created '+ scraper.created);
setTimeout(function(){pool.release(scraper);}, 60*1000);
analyse(res);
}
});
});
}
pooledGraphGet('me?fields=friends,groups', function(res) {
res.groups.data.forEach(function(group) {
saveData (group);
pooledGraphGet(group.id +'?fields=id,name,members,feed', function(res) {
if (res.feed) res.feed.data.forEach(function(feed){
saveData (feed);
pooledGraphGet(feed.from.id +'?fields=id,name', function(res) {
saveData (res);
});
});
});
});
});
最终对我有用的是限制 API 调用的速率。我用了
https://github.com/wankdanker/node-function-rate-limit
然后我做了一个限定版的insertCharacter:
var rateLimit = require('function-rate-limit');
var insertLimited = rateLimit(400, 900000, function (id) {
insertCharacter(id);
});
我正在研究社交网络图,我想根据从 API.
获得的邻接列表构建一个 "six degrees of separation" 树对于每个人,API 将 return 一组形式为 [id1, id2, id3...] 的朋友,这正是我想要的。但美中不足的是人数众多,API只允许400calls/15分钟。我可以将数据保存在本地数据库中,但我不想用请求淹没 API。
我正在做的伪代码是这样的:
requestCharacter = function(id) {
is this person in my db already? if true, return;
else make api call(error, function(){loopFriends(character)}) {
save character in database
}
}
loopFriends(character){
foreach(friend in character.friends) requestCharacter(friend);
}
我已经或多或少地编写了代码,它工作正常,但是由于它不断遍历树,并且由于人们在彼此的朋友列表中重复出现,所以效率非常低,并且不断破坏 API 限制
所以我想做的是将请求排队,在我添加之前检查队列中是否有东西,然后以每次 400 个或更少请求的批次运行队列。 (因此,如果队列中有 1200 个,它将运行 400 个,等待 15 分钟,运行 400,等待 15 分钟,运行 400...)
我尝试将 async.js 与它的队列一起使用,并且我能够将大量内容加载到队列中,但我认为它从未真正运行过。对于这种情况,最好的方法是什么?
我实际的非队列代码如下:
var lookupAndInsertCharacter = function(id){
Character.findOne({ 'id': id }, function (err, person) {
if (err) console.log(err);
else {
if(person!=null) {console.log('%s already exists in database, not saved', person.name); getCharacterFriends(id);}
else insertCharacter(id, function(){getCharacterFriends(id)});
};
})
}
var insertCharacter = function(id, callback){
var url = getCharacterURL(id);
request(url, function (error, response, body) {
if (!error && response.statusCode == 200) {
var result = JSON.parse(body);
if(result.status_code != 1 ) {console.log("ERROR status_code: %s. Please wait 15 minutes", result.status_code); return;}
else {
var me = new Character(processCharacter(result));
me.save(function(err){
if (err) return handleError(err);
});
console.log("Saved character "+me.name);
}
}
else {
console.log(error);
}
});
}
var getCharacterFriends = function(id) {
Character.findOne({ 'id': id }, function (err, person) {
if (err) console.log(err);
else {
console.log("Getting friends for %s",person.name);
_.each(person.character_friends, function(d){
lookupAndInsertCharacter(d);
});
console.log("Getting enemies for %s",person.name);
_.each(person.character_enemies, function(d){
lookupAndInsertCharacter(d);
})
};
})
}
在下面的示例中,我得到了我在 FaceBook 上的所有群组、上面的帖子和作者的 public 个人资料。
为了减慢这个过程,我创建了一个有限的 'scrapers' 池并将每个抓取器保留一段时间,所以我 "cannot overload the FaceBook server :)"
对于上面的例子,你可以
- 将您的池大小限制为 400
max : 400
并保留您的爬虫 15 分钟setTimeout(function(){pool.release(scraper);}, 15*60*1000);
- 或将您的池大小限制为 1
max : 1
并保留您的爬虫 3.75 秒setTimeout(function(){pool.release(scraper);}, 3750);
代码来了
function saveData (anyJson) {
// put your Db communication here.
// console.log(anyJson);
}
function now() {
instant = new Date();
return instant.getHours() +':'+ instant.getMinutes() +':'+ instant.getSeconds() +'.'+ instant.getMilliseconds();
}
var graph = require('fbgraph');
console.log(process.argv[2]);
graph.setAccessToken(process.argv[2]);
var poolModule = require('generic-pool');
var pool = poolModule.Pool({
name : 'scraper',
create : function(callback) {
console.log(now() +' created scraper');
// parameter order: err, resource
callback(null, {created:now()});
},
destroy : function(scraper) {
console.log(now() +' released scraper created '+ scraper.created);
},
max : 10,
min : 1,
idleTimeoutMillis : 60*60*1000,
log : false
});
function pooledGraphGet(path,analyse) {
pool.acquire(function(err,scraper) {
if (err) {
console.log(now() +' Could not get a scraper for '+ path);
throw err;
}
graph.get(path,function(err,res) {
if (err) {
console.log(now() +' Could not get '+ path +' using scraper created '+ scraper.created);
throw err;
} else {
console.log(now() +' Got '+ path +' using scraper created '+ scraper.created);
setTimeout(function(){pool.release(scraper);}, 60*1000);
analyse(res);
}
});
});
}
pooledGraphGet('me?fields=friends,groups', function(res) {
res.groups.data.forEach(function(group) {
saveData (group);
pooledGraphGet(group.id +'?fields=id,name,members,feed', function(res) {
if (res.feed) res.feed.data.forEach(function(feed){
saveData (feed);
pooledGraphGet(feed.from.id +'?fields=id,name', function(res) {
saveData (res);
});
});
});
});
});
最终对我有用的是限制 API 调用的速率。我用了
https://github.com/wankdanker/node-function-rate-limit
然后我做了一个限定版的insertCharacter:
var rateLimit = require('function-rate-limit');
var insertLimited = rateLimit(400, 900000, function (id) {
insertCharacter(id);
});