无法在数百万数据上运行 mongo shell 脚本

Cannot run mongo shell script on several million of data

我在 mongoshell 中有一个脚本,它应该从另一个集合(数据)填充一个集合(数据聚合),每 5 分钟聚合一次时间序列。
数据收集有 7.000.000 多个条目,脚本需要很长时间才能完成...需要 8 小时才能考虑 500.000 条数据,现在似乎已冻结。

基本上数据收集有这样的记录:

{
  isodate: '2014-12-1OT12:47:32.000+02.00',
  value: 234,
  parentID: 123      
}

dataaggreagtion 集合有如下记录:

{
   t: '2014-12-1OT12:45:00.000+02.00',
   pid: 123,  // parentID
   sum: 1234, // sum of all the value of data between 12:45 and 12:50
   count: 5,  // number of data elements between 12:45 and 12:50
   min: 23,   
   max: 435
}

数据集合的每条记录都将成为 dataaggregation 集合记录的一部分(在 count 属性中计为 1)。

// Cleanup collection
db.dataaggregation.remove({})

// Loop through data and populate the dataaggregation collection
db.data.find().addOption(DBQuery.Option.noTimeout).forEach(function(dt){
  // Get 5 minutes timestamp
  // eg: '2014-12-1OT12:47:32.000+02.00' => '2014-12-1OT12:45:00.000+02.00'
  dt.isodate.setMinutes(dt.isodate.getMinutes() - dt.isodate.getMinutes() % 5);
  dt.isodate.setSeconds(0);

  // Create the dataaggregation record for the (timestamp, parentID) couple if does
  // not exist or update the existing one
  var d = db.dataaggregation.findOne({t: dt.isodate, pid: dt.parentID});
  if(!d){
    db.dataaggregation.insert({
        t:dt.isodate,
        pid: dt.parentID,
        sum: dt.value,
        count: 1,
        min: dt.value,
        max: dt.value
    });
  }else{
    db.dataaggregation.update({
        t:dt.isodate,
        pid: dt.parentID
    },{
        $set:{
            sum: d.sum + dt.value,
            count: d.count + 1,
            min: dt.value < d.min ? dt.value : d.min,
            max: dt.value > d.max ? dt.value : d.max
        }
    },
    {upsert:true}
    );
  }
})

有什么改进的想法或建议吗?有什么明显的我想念的吗?

很难说为什么这么慢,但我 noticed/would 做了一些不同的事情:

  • 使用 $inc 而不是 $set 来增加计数和总和

  • 在 t 和 pid 上创建组合索引

您还可以考虑读取按 isodate 排序的数据,然后在到达新的 5 分钟存储桶后仅将 5 分钟存储桶写入 MongoDB。这将大大减少对聚合集合的读写量。

为什么不直接使用 aggregation framework for this? The $group pipeline does this along with other operators 来处理您的计算。

您可能需要 MongoDB 2.6 或更高版本的服务器才能执行此操作。我建议 运行 启用此选项,打开 "allowDiskUse" 选项并使用 $out 管道阶段写入集合。

您在这里需要做的第一件事是将日期中的所有 "string" 数据转换为真实的 Date 对象。这很容易做到,并且在 Whosebug 上被很好地引用,因为它是一个常见的建模错误。

最简单的方法可能是使用基本 "date math"。 MongoDB 中的日期对象通过返回 "epoch timestamp" 值(当从纪元日期本身中减去时,否则它只是一个具有毫秒差的数字)来响应针对其他日期对象的数学运算。这使得间隔变得简单:

db.data.aggregate([
    { "$group": {
        "_id": {
            "t": {
                "$subtract": [
                    { "$isoDate", new Date("1970-01-01") },
                    { "$subtract": [
                       { "$isoDate", new Date("1970-01-01") },
                       { "$mod": [
                           { "$isoDate", new Date("1970-01-01") },
                           1000 * 60 * 5
                       ]}
                    ]}                          
                ]
            },
            "pid": "$parentID"
        },
        "sum": { "$sum": "$value" },
        "count": { "$sum": 1 },
        "min": { "$min": "$value" },
        "max": { "$max": "$value" }
    }},
    { "$project": {
        "_id": 0,
        "t": "$_id.t",
        "pid": "$_id.pid",
        "sum": 1,
        "count": 1,
        "min": 1,
        "max": 1
    }},
    { "$out": "dataaggregation" }
],{ "allowDiskUse": true })

或使用日期聚合运算符进行类似操作:

db.data.aggregate([
    { "$group": {
        "_id": {
            "t": {
                "year": { "$year": "$isodate" },
                "month": { "$month": "$isodate" },
                "dayOfMonth": { "$dayOfMonth": "$isodate" },
                "hour": { "$hour": "$isodate" },
                "minute": {
                    "$mod": [
                        { "$minute": "$isodate" },
                        5
                    ]
                }
            },
            "pid": "$parentID"
        },
        "sum": { "$sum": "$value" },
        "count": { "$sum": 1 },
        "min": { "$min": "$value" },
        "max": { "$max": "$value" }
    }},
    { "$project": {
        "_id": 0,
        "t": "$_id.t",
        "pid": "$_id.pid",
        "sum": 1,
        "count": 1,
        "min": 1,
        "max": 1
    }},
    { "$out": "dataaggregation" }
],{ "allowDiskUse": true })

长篇大论,但这取决于你想要的输出。在任何一种情况下,基本方法都是使用模 $mod 运算符,以便根据每种情况下提供的数字结果确定 5 分钟的间隔。

无论哪种方式,您都不会在结果中得到 Date 对象,但您确实得到了可以轻松 "cast" 到 Date 对象中的东西。

如果您可以接受 "dataaggregation" 集合并使用复合 _id 字段代替 "t" 和 "pid" 以及提高效率。

只要您的服务器可以处理它,它就会 运行 比传输到客户端并写回数据库快得多。


补充一下,这里有一种使用 Bulk Operations API:

"cast" 所有字符串作为日期的方法
var bulk = db.collection.initializeOrderdBulkOp();
var counter = 0;

db.collection.find().forEach(function(doc) {
    bulk.find({ "_id": doc._id })
        .updateOne({ "$set": { "isodate": new Date(doc.isodate) } });
    counter++;

    if( counter % 1000 == 0 ) {
        bulk.execute();
        bulk = db.collection.initializeOrderdBulkOp();
    }  
});

if ( counter % 1000 != 0 )
    bulk.execute();

正如 Neil 所建议的,我建议您使用 Mongo 聚合。 如果你想聚合所有 5 分钟的数据,你可以使用:

db.data.aggregate([
    { "$group": {
        "_id": {
            "t": { $subtract: [{ $subtract: [ "$isodate", { $multiply: [{ $mod: [ {$minute:"$isodate"}, 5 ] }, 60*1000]} ] }, { $multiply: [{ $mod: [ {$second:"$isodate"}, 60 ] }, 1000]}]},
            "pid": "$variableID"
        },
        "sum": { "$sum": "$value" },
        "count": { "$sum": 1 },
        "min": { "$min": "$value" },
        "max": { "$max": "$value" }
    }},
    { "$project": {
        "_id": 0,
        "t": "$_id.t",
        "pid": "$_id.pid",
        "sum": 1,
        "count": 1,
        "min": 1,
        "max": 1
    }},
{ "$out": "dataaggregation" }
],{ "allowDiskUse": true })