Mongodb 30 亿文档的不同聚合

Mongodb distinct aggregation of 3 billion documents

我有一个巨大的 collection,其中包含 30 亿个文档。每个文档如下所示:

"_id" : ObjectId("54c1a013715faf2cc0047c77"),
"service_type" : "JE",
"receiver_id" : NumberLong("865438083645"),
"time" : ISODate("2012-12-05T23:07:36Z"),
"duration" : 24,
"service_description" : "NQ",
"receiver_cell_id" : null,
"location_id" : "658_55525",
"caller_id" : NumberLong("475035504705")

我想获取不同用户的列表(他们应该至少作为呼叫者出现一次 'caller_id'),他们的计数(每个用户在 collection 中出现的次数呼叫者或接收者)和位置计数(如果他们是呼叫者)(即每个用户每个 location_id 的计数)。

我想以以下结尾:

"number_of_records" : 20,
"locations" : [{location_id: 658_55525, count:5}, {location_id: 840_5425, count:15}],
"user" : NumberLong("475035504705")

我尝试了 here and here 中描述的解决方案,但它们效率不够(非常慢)。实现这一目标的有效方法是什么?

map-reduce 解决方案比 aggregation 管道更适合这里,因为它避免了两个 unwinds。如果您可以通过一次展开来提出聚合解决方案,那就可以了。但是下面的 map-reduce 解决方案是一种方法,尽管您需要针对大数据测量它的 运行 时间,看看它是否适合您。

map函数:

var map = function(){
    emit(this.caller_id,
        {locs:[{"location_id":this.location_id,"count":1}]});
}

reduce函数:

var reduce = function(key,values){
    var result = {locs:[]};
    var locations = {};
    values.forEach(function(value){
        value.locs.forEach(function(loc){
                if(!locations[loc.location_id]){
                    locations[loc.location_id] = loc.count;
                }
                else{
                    locations[loc.location_id]++;
                }
        })
    })
    Object.keys(locations).forEach(function(k){
        result.locs.push({"location_id":k,"count":locations[k]});
    })
    return result;
}

finalize函数:

var finalize = function(key,value){
    var total = 0;
    value.locs.forEach(function(loc){
        total += loc.count;
    })
    return {"total":total,"locs":value.locs};
}

调用 map-reduce:

db.collection.mapReduce(map,reduce,{"out":"t1","finalize":finalize});

在 map-reduce 生成输出后聚合结果。

db.t1.aggregate([
{$project:{"_id":0,
           "number_of_records":"$value.total",
           "locations":"$value.locs","user":"$_id"}}
])

样本o/p:

{
        "number_of_records" : 3,
        "locations" : [
                {
                        "location_id" : "658_55525",
                        "count" : 1
                },
                {
                        "location_id" : "658_55525213",
                        "count" : 2
                }
        ],
        "user" : 2
}
{
        "number_of_records" : 1,
        "locations" : [
                {
                        "location_id" : "658_55525",
                        "count" : 1
                }
        ],
        "user" : NumberLong("475035504705")
}

map-reduce java 脚本代码应该是自解释的。

对结果使用聚合:

db.<collection>.aggregate([
   { $group : { _id : { user:  "$caller_id", localtion: '$location_id'} , count : { $sum : 1}  } },
   { $project : { _id : 0, _id : '$_id.user', location : '$_id.localtion', count : '$count' } },
   { $group : { _id : '$_id', 'locations' : { $push : { location_id : '$location', count : '$count' } }, number_of_records : {$sum : '$count'} } },
   { $project : { _id : 0, user : '$_id', locations : '$locations', number_of_records : '$number_of_records'} },
   { $out : 'outputCollection'},
])

输出将是:

{
    "0" : {
        "locations" : [ 
            {
                "location_id" : "840_5425",
                "count" : 8
            }, 
            {
                "location_id" : "658_55525",
                "count" : 5
            }
        ],
        "number_of_records" : 13,
        "user" : NumberLong(475035504705)
    }
}

更新 使用 allowDiskUse:

var pipe = [
   { $group : { _id : { user:  "$caller_id", localtion: '$location_id'} , count : { $sum : 1}  } },
   { $project : { _id : 0, _id : '$_id.user', location : '$_id.localtion', count : '$count' } },
   { $group : { _id : '$_id', 'locations' : { $push : { location_id : '$location', count : '$count' } }, number_of_records : {$sum : '$count'} } },
   { $project : { _id : 0, user : '$_id', locations : '$locations', number_of_records : '$number_of_records'} },
   { $out : 'outputCollection'},
];

db.runCommand(
   { aggregate: "collection",
     pipeline: pipe,
     allowDiskUse: true
   }
)