使用 nodejs/async 批量和延迟 API 次调用

Batch and delay API calls with nodejs/async

我正在研究社交网络图,我想根据从 API.

获得的邻接列表构建一个 "six degrees of separation" 树

对于每个人,API 将 return 一组形式为 [id1, id2, id3...] 的朋友,这正是我想要的。但美中不足的是人数众多,API只允许400calls/15分钟。我可以将数据保存在本地数据库中,但我不想用请求淹没 API。

我正在做的伪代码是这样的:

requestCharacter = function(id) {
    is this person in my db already? if true, return;
    else make api call(error, function(){loopFriends(character)}) {
       save character in database
    }
}

loopFriends(character){
   foreach(friend in character.friends) requestCharacter(friend);
}

我已经或多或少地编写了代码,它工作正常,但是由于它不断遍历树,并且由于人们在彼此的朋友列表中重复出现,所以效率非常低,并且不断破坏 API 限制

所以我想做的是将请求排队,在我添加之前检查队列中是否有东西,然后以每次 400 个或更少请求的批次运行队列。 (因此,如果队列中有 1200 个,它将运行 400 个,等待 15 分钟,运行 400,等待 15 分钟,运行 400...)

我尝试将 async.js 与它的队列一起使用,并且我能够将大量内容加载到队列中,但我认为它从未真正运行过。对于这种情况,最好的方法是什么?

我实际的非队列代码如下:

var lookupAndInsertCharacter = function(id){
  Character.findOne({ 'id': id }, function (err, person) {
    if (err) console.log(err);
    else {
      if(person!=null) {console.log('%s already exists in database, not saved', person.name); getCharacterFriends(id);}
      else insertCharacter(id, function(){getCharacterFriends(id)});
    };
  })
}

var insertCharacter = function(id, callback){
    var url = getCharacterURL(id);
    request(url, function (error, response, body) {
    if (!error && response.statusCode == 200) {
      var result = JSON.parse(body);
      if(result.status_code != 1 ) {console.log("ERROR status_code: %s. Please wait 15 minutes", result.status_code); return;}
      else {
        var me = new Character(processCharacter(result));
        me.save(function(err){
          if (err) return handleError(err);
        });
        console.log("Saved character "+me.name);
      }  
    } 
    else {
      console.log(error);
    }
  }); 
}

var getCharacterFriends = function(id) {
  Character.findOne({ 'id': id }, function (err, person) {
    if (err) console.log(err);
    else {
      console.log("Getting friends for %s",person.name);
      _.each(person.character_friends, function(d){
        lookupAndInsertCharacter(d);
      });
      console.log("Getting enemies for %s",person.name);
      _.each(person.character_enemies, function(d){
        lookupAndInsertCharacter(d);
      })
    };
  })
}

在下面的示例中,我得到了我在 FaceBook 上的所有群组、上面的帖子和作者的 public 个人资料。

为了减慢这个过程,我创建了一个有限的 'scrapers' 池并将每个抓取器保留一段时间,所以我 "cannot overload the FaceBook server :)"

对于上面的例子,你可以

  • 将您的池大小限制为 400 max : 400 并保留您的爬虫 15 分钟setTimeout(function(){pool.release(scraper);}, 15*60*1000);
  • 或将您的池大小限制为 1 max : 1 并保留您的爬虫 3.75 秒 setTimeout(function(){pool.release(scraper);}, 3750);

代码来了

function saveData (anyJson) {
    // put your Db communication here.
    // console.log(anyJson);
}

function now() {
    instant = new Date();
    return instant.getHours() +':'+ instant.getMinutes() +':'+ instant.getSeconds() +'.'+ instant.getMilliseconds();
}
var graph = require('fbgraph');
console.log(process.argv[2]);
graph.setAccessToken(process.argv[2]);

var poolModule = require('generic-pool');
var pool = poolModule.Pool({
    name     : 'scraper',
    create   : function(callback) {
        console.log(now() +' created scraper');
        // parameter order: err, resource
        callback(null, {created:now()});
    },
    destroy  : function(scraper) { 
        console.log(now() +' released scraper created '+ scraper.created); 
    },
    max      : 10,
    min      : 1, 
    idleTimeoutMillis : 60*60*1000,
    log : false
});

function pooledGraphGet(path,analyse) {
    pool.acquire(function(err,scraper) {
        if (err) {
            console.log(now() +' Could not get a scraper for '+ path);
            throw err;
        }
        graph.get(path,function(err,res) {
            if (err) {
                console.log(now() +' Could not get '+ path +' using scraper created '+ scraper.created);
                throw err;
            } else {
                console.log(now() +' Got '+ path +' using scraper created '+ scraper.created);
                setTimeout(function(){pool.release(scraper);}, 60*1000);
                analyse(res);
            }
        });
    });
}

pooledGraphGet('me?fields=friends,groups', function(res) {
    res.groups.data.forEach(function(group) {
        saveData (group);
        pooledGraphGet(group.id +'?fields=id,name,members,feed', function(res) {
            if (res.feed) res.feed.data.forEach(function(feed){
                saveData (feed);
                pooledGraphGet(feed.from.id +'?fields=id,name', function(res) {
                    saveData (res);
                });
            });
        });
    });
});

最终对我有用的是限制 API 调用的速率。我用了

https://github.com/wankdanker/node-function-rate-limit

然后我做了一个限定版的insertCharacter:

var rateLimit = require('function-rate-limit');

var insertLimited = rateLimit(400, 900000, function (id) {
  insertCharacter(id);
});