Mongodb 30 亿文档的不同聚合
Mongodb distinct aggregation of 3 billion documents
我有一个巨大的 collection,其中包含 30 亿个文档。每个文档如下所示:
"_id" : ObjectId("54c1a013715faf2cc0047c77"),
"service_type" : "JE",
"receiver_id" : NumberLong("865438083645"),
"time" : ISODate("2012-12-05T23:07:36Z"),
"duration" : 24,
"service_description" : "NQ",
"receiver_cell_id" : null,
"location_id" : "658_55525",
"caller_id" : NumberLong("475035504705")
我想获取不同用户的列表(他们应该至少作为呼叫者出现一次 'caller_id'),他们的计数(每个用户在 collection 中出现的次数呼叫者或接收者)和位置计数(如果他们是呼叫者)(即每个用户每个 location_id 的计数)。
我想以以下结尾:
"number_of_records" : 20,
"locations" : [{location_id: 658_55525, count:5}, {location_id: 840_5425, count:15}],
"user" : NumberLong("475035504705")
我尝试了 here and here 中描述的解决方案,但它们效率不够(非常慢)。实现这一目标的有效方法是什么?
map-reduce
解决方案比 aggregation
管道更适合这里,因为它避免了两个 unwinds
。如果您可以通过一次展开来提出聚合解决方案,那就可以了。但是下面的 map-reduce 解决方案是一种方法,尽管您需要针对大数据测量它的 运行 时间,看看它是否适合您。
map
函数:
var map = function(){
emit(this.caller_id,
{locs:[{"location_id":this.location_id,"count":1}]});
}
reduce
函数:
var reduce = function(key,values){
var result = {locs:[]};
var locations = {};
values.forEach(function(value){
value.locs.forEach(function(loc){
if(!locations[loc.location_id]){
locations[loc.location_id] = loc.count;
}
else{
locations[loc.location_id]++;
}
})
})
Object.keys(locations).forEach(function(k){
result.locs.push({"location_id":k,"count":locations[k]});
})
return result;
}
finalize
函数:
var finalize = function(key,value){
var total = 0;
value.locs.forEach(function(loc){
total += loc.count;
})
return {"total":total,"locs":value.locs};
}
调用 map-reduce:
db.collection.mapReduce(map,reduce,{"out":"t1","finalize":finalize});
在 map-reduce 生成输出后聚合结果。
db.t1.aggregate([
{$project:{"_id":0,
"number_of_records":"$value.total",
"locations":"$value.locs","user":"$_id"}}
])
样本o/p:
{
"number_of_records" : 3,
"locations" : [
{
"location_id" : "658_55525",
"count" : 1
},
{
"location_id" : "658_55525213",
"count" : 2
}
],
"user" : 2
}
{
"number_of_records" : 1,
"locations" : [
{
"location_id" : "658_55525",
"count" : 1
}
],
"user" : NumberLong("475035504705")
}
map-reduce java 脚本代码应该是自解释的。
对结果使用聚合:
db.<collection>.aggregate([
{ $group : { _id : { user: "$caller_id", localtion: '$location_id'} , count : { $sum : 1} } },
{ $project : { _id : 0, _id : '$_id.user', location : '$_id.localtion', count : '$count' } },
{ $group : { _id : '$_id', 'locations' : { $push : { location_id : '$location', count : '$count' } }, number_of_records : {$sum : '$count'} } },
{ $project : { _id : 0, user : '$_id', locations : '$locations', number_of_records : '$number_of_records'} },
{ $out : 'outputCollection'},
])
输出将是:
{
"0" : {
"locations" : [
{
"location_id" : "840_5425",
"count" : 8
},
{
"location_id" : "658_55525",
"count" : 5
}
],
"number_of_records" : 13,
"user" : NumberLong(475035504705)
}
}
更新 使用 allowDiskUse
:
var pipe = [
{ $group : { _id : { user: "$caller_id", localtion: '$location_id'} , count : { $sum : 1} } },
{ $project : { _id : 0, _id : '$_id.user', location : '$_id.localtion', count : '$count' } },
{ $group : { _id : '$_id', 'locations' : { $push : { location_id : '$location', count : '$count' } }, number_of_records : {$sum : '$count'} } },
{ $project : { _id : 0, user : '$_id', locations : '$locations', number_of_records : '$number_of_records'} },
{ $out : 'outputCollection'},
];
db.runCommand(
{ aggregate: "collection",
pipeline: pipe,
allowDiskUse: true
}
)
我有一个巨大的 collection,其中包含 30 亿个文档。每个文档如下所示:
"_id" : ObjectId("54c1a013715faf2cc0047c77"),
"service_type" : "JE",
"receiver_id" : NumberLong("865438083645"),
"time" : ISODate("2012-12-05T23:07:36Z"),
"duration" : 24,
"service_description" : "NQ",
"receiver_cell_id" : null,
"location_id" : "658_55525",
"caller_id" : NumberLong("475035504705")
我想获取不同用户的列表(他们应该至少作为呼叫者出现一次 'caller_id'),他们的计数(每个用户在 collection 中出现的次数呼叫者或接收者)和位置计数(如果他们是呼叫者)(即每个用户每个 location_id 的计数)。
我想以以下结尾:
"number_of_records" : 20,
"locations" : [{location_id: 658_55525, count:5}, {location_id: 840_5425, count:15}],
"user" : NumberLong("475035504705")
我尝试了 here and here 中描述的解决方案,但它们效率不够(非常慢)。实现这一目标的有效方法是什么?
map-reduce
解决方案比 aggregation
管道更适合这里,因为它避免了两个 unwinds
。如果您可以通过一次展开来提出聚合解决方案,那就可以了。但是下面的 map-reduce 解决方案是一种方法,尽管您需要针对大数据测量它的 运行 时间,看看它是否适合您。
map
函数:
var map = function(){
emit(this.caller_id,
{locs:[{"location_id":this.location_id,"count":1}]});
}
reduce
函数:
var reduce = function(key,values){
var result = {locs:[]};
var locations = {};
values.forEach(function(value){
value.locs.forEach(function(loc){
if(!locations[loc.location_id]){
locations[loc.location_id] = loc.count;
}
else{
locations[loc.location_id]++;
}
})
})
Object.keys(locations).forEach(function(k){
result.locs.push({"location_id":k,"count":locations[k]});
})
return result;
}
finalize
函数:
var finalize = function(key,value){
var total = 0;
value.locs.forEach(function(loc){
total += loc.count;
})
return {"total":total,"locs":value.locs};
}
调用 map-reduce:
db.collection.mapReduce(map,reduce,{"out":"t1","finalize":finalize});
在 map-reduce 生成输出后聚合结果。
db.t1.aggregate([
{$project:{"_id":0,
"number_of_records":"$value.total",
"locations":"$value.locs","user":"$_id"}}
])
样本o/p:
{
"number_of_records" : 3,
"locations" : [
{
"location_id" : "658_55525",
"count" : 1
},
{
"location_id" : "658_55525213",
"count" : 2
}
],
"user" : 2
}
{
"number_of_records" : 1,
"locations" : [
{
"location_id" : "658_55525",
"count" : 1
}
],
"user" : NumberLong("475035504705")
}
map-reduce java 脚本代码应该是自解释的。
对结果使用聚合:
db.<collection>.aggregate([
{ $group : { _id : { user: "$caller_id", localtion: '$location_id'} , count : { $sum : 1} } },
{ $project : { _id : 0, _id : '$_id.user', location : '$_id.localtion', count : '$count' } },
{ $group : { _id : '$_id', 'locations' : { $push : { location_id : '$location', count : '$count' } }, number_of_records : {$sum : '$count'} } },
{ $project : { _id : 0, user : '$_id', locations : '$locations', number_of_records : '$number_of_records'} },
{ $out : 'outputCollection'},
])
输出将是:
{
"0" : {
"locations" : [
{
"location_id" : "840_5425",
"count" : 8
},
{
"location_id" : "658_55525",
"count" : 5
}
],
"number_of_records" : 13,
"user" : NumberLong(475035504705)
}
}
更新 使用 allowDiskUse
:
var pipe = [
{ $group : { _id : { user: "$caller_id", localtion: '$location_id'} , count : { $sum : 1} } },
{ $project : { _id : 0, _id : '$_id.user', location : '$_id.localtion', count : '$count' } },
{ $group : { _id : '$_id', 'locations' : { $push : { location_id : '$location', count : '$count' } }, number_of_records : {$sum : '$count'} } },
{ $project : { _id : 0, user : '$_id', locations : '$locations', number_of_records : '$number_of_records'} },
{ $out : 'outputCollection'},
];
db.runCommand(
{ aggregate: "collection",
pipeline: pipe,
allowDiskUse: true
}
)