nodejs和mongoskin,所有项目保存后的回调

nodejs and mongoskin, callback after all items have been saved

我有以下代码,我在其中循环访问一个集合并执行另一个数据库查询并在其回调中构造一个对象。最后我将该对象保存到另一个集合中。

我希望在保存所有项目后调用另一个函数,但不知道如何调用。我尝试使用异步库,特别是当项目不为空时异步,但这只会让我陷入无限循环。

有没有办法确定所有项目何时都已保存?

谢谢!

var cursor = db.collection('user_apps').find({}, {timeout:false});
cursor.each(function (err, item) {
    if (err) {
        throw err;
    }
    if (item) {
        var appList = item.appList;
        var uuid= item.uuid;
        db.collection('app_categories').find({schema_name:{$in: appList}}).toArray(function (err, result) {
            if (err) throw err;
            var catCount = _.countBy(result, function (obj) {
                return obj.category;
            })
            catObj['_id'] = uuid;
            catObj['total_app_num'] = result.length;
            catObj['app_breakdown'] = catCount;
            db.collection('audiences').insert(catObj, function (err) {
                if (err) console.log(err);
            });
        }); 
    }
    else {
        // do something here after all items have been saved
    }
});

这里的关键是在执行 "loop" 操作时使用将遵守回调信号的东西。此处实现的 .each() 不会这样做,因此您需要一个 "async" 循环控件,它表示每个循环都已迭代并完成,并且在回调中有自己的回调。

如果您的基础 MongoDB 驱动程序至少是版本 2,那么会有一个 .forEach() 有一个回调,在循环完成时调用。这比 .each() 好,但它没有解决知道内部 "async" .insert() 操作何时完成的问题。

因此,更好的方法是使用 .find() 编辑的 stream interface return,这是允许更多的流量控制。有一个 .stream() 向后兼容的方法,但现代驱动程序默认只 return 接口:

var stream = db.collection('user_apps').find({});

stream.on("err",function(err){
    throw(err);
});

stream.on("data",function(item) {
    stream.pause();                 // pause processing of stream
    var appList = item.appList;
    var uuid= item.uuid;
    db.collection('app_categories').find({schema_name:{ "$in": appList}}).toArray(function (err, result) {
        if (err) throw err;
        var catCount = _.countBy(result, function (obj) {
            return obj.category;
        })

        var catObj = {};        // always re-init
        catObj['_id'] = uuid;
        catObj['total_app_num'] = result.length;
        catObj['app_breakdown'] = catCount;
        db.collection('audiences').insert(catObj, function (err) {
            if (err) console.log(err);
            stream.resume();        // resume stream processing
        });
    }); 
});

stream.on("end",function(){
    // stream complete and processing done
});

流上的 .pause() 方法停止发出更多事件,以便一次处理每个对象结果。当调用来自 .insert() 的回调时,将调用 .resume() 方法,表示该项目的处理已完成,可以进行新的调用以处理下一个项目。

流完成后,一切都已完成,因此调用 "end" 事件挂钩以继续您的代码。

这样,每个循环都用结束表示以移动到下一个迭代,并且有一个定义的 "end" 事件用于处理的完全结束。由于控件是 "inside" .insert() 回调,因此这些操作也可以完成。

附带说明一下,您可以考虑将 "category" 信息包含在源集合中,因为使用 .aggregate() 似乎可以更有效地 return 编辑您的结果,如果所有所需数据在一个集合中。