无法在数百万数据上运行 mongo shell 脚本
Cannot run mongo shell script on several million of data
我在 mongoshell 中有一个脚本,它应该从另一个集合(数据)填充一个集合(数据聚合),每 5 分钟聚合一次时间序列。
数据收集有 7.000.000 多个条目,脚本需要很长时间才能完成...需要 8 小时才能考虑 500.000 条数据,现在似乎已冻结。
基本上数据收集有这样的记录:
{
isodate: '2014-12-1OT12:47:32.000+02.00',
value: 234,
parentID: 123
}
dataaggreagtion 集合有如下记录:
{
t: '2014-12-1OT12:45:00.000+02.00',
pid: 123, // parentID
sum: 1234, // sum of all the value of data between 12:45 and 12:50
count: 5, // number of data elements between 12:45 and 12:50
min: 23,
max: 435
}
数据集合的每条记录都将成为 dataaggregation 集合记录的一部分(在 count 属性中计为 1)。
// Cleanup collection
db.dataaggregation.remove({})
// Loop through data and populate the dataaggregation collection
db.data.find().addOption(DBQuery.Option.noTimeout).forEach(function(dt){
// Get 5 minutes timestamp
// eg: '2014-12-1OT12:47:32.000+02.00' => '2014-12-1OT12:45:00.000+02.00'
dt.isodate.setMinutes(dt.isodate.getMinutes() - dt.isodate.getMinutes() % 5);
dt.isodate.setSeconds(0);
// Create the dataaggregation record for the (timestamp, parentID) couple if does
// not exist or update the existing one
var d = db.dataaggregation.findOne({t: dt.isodate, pid: dt.parentID});
if(!d){
db.dataaggregation.insert({
t:dt.isodate,
pid: dt.parentID,
sum: dt.value,
count: 1,
min: dt.value,
max: dt.value
});
}else{
db.dataaggregation.update({
t:dt.isodate,
pid: dt.parentID
},{
$set:{
sum: d.sum + dt.value,
count: d.count + 1,
min: dt.value < d.min ? dt.value : d.min,
max: dt.value > d.max ? dt.value : d.max
}
},
{upsert:true}
);
}
})
有什么改进的想法或建议吗?有什么明显的我想念的吗?
很难说为什么这么慢,但我 noticed/would 做了一些不同的事情:
使用 $inc 而不是 $set 来增加计数和总和
在 t 和 pid 上创建组合索引
您还可以考虑读取按 isodate 排序的数据,然后在到达新的 5 分钟存储桶后仅将 5 分钟存储桶写入 MongoDB。这将大大减少对聚合集合的读写量。
为什么不直接使用 aggregation framework for this? The $group
pipeline does this along with other operators 来处理您的计算。
您可能需要 MongoDB 2.6 或更高版本的服务器才能执行此操作。我建议 运行 启用此选项,打开 "allowDiskUse" 选项并使用 $out
管道阶段写入集合。
您在这里需要做的第一件事是将日期中的所有 "string" 数据转换为真实的 Date
对象。这很容易做到,并且在 Whosebug 上被很好地引用,因为它是一个常见的建模错误。
最简单的方法可能是使用基本 "date math"。 MongoDB 中的日期对象通过返回 "epoch timestamp" 值(当从纪元日期本身中减去时,否则它只是一个具有毫秒差的数字)来响应针对其他日期对象的数学运算。这使得间隔变得简单:
db.data.aggregate([
{ "$group": {
"_id": {
"t": {
"$subtract": [
{ "$isoDate", new Date("1970-01-01") },
{ "$subtract": [
{ "$isoDate", new Date("1970-01-01") },
{ "$mod": [
{ "$isoDate", new Date("1970-01-01") },
1000 * 60 * 5
]}
]}
]
},
"pid": "$parentID"
},
"sum": { "$sum": "$value" },
"count": { "$sum": 1 },
"min": { "$min": "$value" },
"max": { "$max": "$value" }
}},
{ "$project": {
"_id": 0,
"t": "$_id.t",
"pid": "$_id.pid",
"sum": 1,
"count": 1,
"min": 1,
"max": 1
}},
{ "$out": "dataaggregation" }
],{ "allowDiskUse": true })
或使用日期聚合运算符进行类似操作:
db.data.aggregate([
{ "$group": {
"_id": {
"t": {
"year": { "$year": "$isodate" },
"month": { "$month": "$isodate" },
"dayOfMonth": { "$dayOfMonth": "$isodate" },
"hour": { "$hour": "$isodate" },
"minute": {
"$mod": [
{ "$minute": "$isodate" },
5
]
}
},
"pid": "$parentID"
},
"sum": { "$sum": "$value" },
"count": { "$sum": 1 },
"min": { "$min": "$value" },
"max": { "$max": "$value" }
}},
{ "$project": {
"_id": 0,
"t": "$_id.t",
"pid": "$_id.pid",
"sum": 1,
"count": 1,
"min": 1,
"max": 1
}},
{ "$out": "dataaggregation" }
],{ "allowDiskUse": true })
长篇大论,但这取决于你想要的输出。在任何一种情况下,基本方法都是使用模 $mod
运算符,以便根据每种情况下提供的数字结果确定 5 分钟的间隔。
无论哪种方式,您都不会在结果中得到 Date
对象,但您确实得到了可以轻松 "cast" 到 Date
对象中的东西。
如果您可以接受 "dataaggregation" 集合并使用复合 _id
字段代替 "t" 和 "pid" 以及提高效率。
只要您的服务器可以处理它,它就会 运行 比传输到客户端并写回数据库快得多。
补充一下,这里有一种使用 Bulk Operations API:
"cast" 所有字符串作为日期的方法
var bulk = db.collection.initializeOrderdBulkOp();
var counter = 0;
db.collection.find().forEach(function(doc) {
bulk.find({ "_id": doc._id })
.updateOne({ "$set": { "isodate": new Date(doc.isodate) } });
counter++;
if( counter % 1000 == 0 ) {
bulk.execute();
bulk = db.collection.initializeOrderdBulkOp();
}
});
if ( counter % 1000 != 0 )
bulk.execute();
正如 Neil 所建议的,我建议您使用 Mongo 聚合。
如果你想聚合所有 5 分钟的数据,你可以使用:
db.data.aggregate([
{ "$group": {
"_id": {
"t": { $subtract: [{ $subtract: [ "$isodate", { $multiply: [{ $mod: [ {$minute:"$isodate"}, 5 ] }, 60*1000]} ] }, { $multiply: [{ $mod: [ {$second:"$isodate"}, 60 ] }, 1000]}]},
"pid": "$variableID"
},
"sum": { "$sum": "$value" },
"count": { "$sum": 1 },
"min": { "$min": "$value" },
"max": { "$max": "$value" }
}},
{ "$project": {
"_id": 0,
"t": "$_id.t",
"pid": "$_id.pid",
"sum": 1,
"count": 1,
"min": 1,
"max": 1
}},
{ "$out": "dataaggregation" }
],{ "allowDiskUse": true })
我在 mongoshell 中有一个脚本,它应该从另一个集合(数据)填充一个集合(数据聚合),每 5 分钟聚合一次时间序列。
数据收集有 7.000.000 多个条目,脚本需要很长时间才能完成...需要 8 小时才能考虑 500.000 条数据,现在似乎已冻结。
基本上数据收集有这样的记录:
{
isodate: '2014-12-1OT12:47:32.000+02.00',
value: 234,
parentID: 123
}
dataaggreagtion 集合有如下记录:
{
t: '2014-12-1OT12:45:00.000+02.00',
pid: 123, // parentID
sum: 1234, // sum of all the value of data between 12:45 and 12:50
count: 5, // number of data elements between 12:45 and 12:50
min: 23,
max: 435
}
数据集合的每条记录都将成为 dataaggregation 集合记录的一部分(在 count 属性中计为 1)。
// Cleanup collection
db.dataaggregation.remove({})
// Loop through data and populate the dataaggregation collection
db.data.find().addOption(DBQuery.Option.noTimeout).forEach(function(dt){
// Get 5 minutes timestamp
// eg: '2014-12-1OT12:47:32.000+02.00' => '2014-12-1OT12:45:00.000+02.00'
dt.isodate.setMinutes(dt.isodate.getMinutes() - dt.isodate.getMinutes() % 5);
dt.isodate.setSeconds(0);
// Create the dataaggregation record for the (timestamp, parentID) couple if does
// not exist or update the existing one
var d = db.dataaggregation.findOne({t: dt.isodate, pid: dt.parentID});
if(!d){
db.dataaggregation.insert({
t:dt.isodate,
pid: dt.parentID,
sum: dt.value,
count: 1,
min: dt.value,
max: dt.value
});
}else{
db.dataaggregation.update({
t:dt.isodate,
pid: dt.parentID
},{
$set:{
sum: d.sum + dt.value,
count: d.count + 1,
min: dt.value < d.min ? dt.value : d.min,
max: dt.value > d.max ? dt.value : d.max
}
},
{upsert:true}
);
}
})
有什么改进的想法或建议吗?有什么明显的我想念的吗?
很难说为什么这么慢,但我 noticed/would 做了一些不同的事情:
使用 $inc 而不是 $set 来增加计数和总和
在 t 和 pid 上创建组合索引
您还可以考虑读取按 isodate 排序的数据,然后在到达新的 5 分钟存储桶后仅将 5 分钟存储桶写入 MongoDB。这将大大减少对聚合集合的读写量。
为什么不直接使用 aggregation framework for this? The $group
pipeline does this along with other operators 来处理您的计算。
您可能需要 MongoDB 2.6 或更高版本的服务器才能执行此操作。我建议 运行 启用此选项,打开 "allowDiskUse" 选项并使用 $out
管道阶段写入集合。
您在这里需要做的第一件事是将日期中的所有 "string" 数据转换为真实的 Date
对象。这很容易做到,并且在 Whosebug 上被很好地引用,因为它是一个常见的建模错误。
最简单的方法可能是使用基本 "date math"。 MongoDB 中的日期对象通过返回 "epoch timestamp" 值(当从纪元日期本身中减去时,否则它只是一个具有毫秒差的数字)来响应针对其他日期对象的数学运算。这使得间隔变得简单:
db.data.aggregate([
{ "$group": {
"_id": {
"t": {
"$subtract": [
{ "$isoDate", new Date("1970-01-01") },
{ "$subtract": [
{ "$isoDate", new Date("1970-01-01") },
{ "$mod": [
{ "$isoDate", new Date("1970-01-01") },
1000 * 60 * 5
]}
]}
]
},
"pid": "$parentID"
},
"sum": { "$sum": "$value" },
"count": { "$sum": 1 },
"min": { "$min": "$value" },
"max": { "$max": "$value" }
}},
{ "$project": {
"_id": 0,
"t": "$_id.t",
"pid": "$_id.pid",
"sum": 1,
"count": 1,
"min": 1,
"max": 1
}},
{ "$out": "dataaggregation" }
],{ "allowDiskUse": true })
或使用日期聚合运算符进行类似操作:
db.data.aggregate([
{ "$group": {
"_id": {
"t": {
"year": { "$year": "$isodate" },
"month": { "$month": "$isodate" },
"dayOfMonth": { "$dayOfMonth": "$isodate" },
"hour": { "$hour": "$isodate" },
"minute": {
"$mod": [
{ "$minute": "$isodate" },
5
]
}
},
"pid": "$parentID"
},
"sum": { "$sum": "$value" },
"count": { "$sum": 1 },
"min": { "$min": "$value" },
"max": { "$max": "$value" }
}},
{ "$project": {
"_id": 0,
"t": "$_id.t",
"pid": "$_id.pid",
"sum": 1,
"count": 1,
"min": 1,
"max": 1
}},
{ "$out": "dataaggregation" }
],{ "allowDiskUse": true })
长篇大论,但这取决于你想要的输出。在任何一种情况下,基本方法都是使用模 $mod
运算符,以便根据每种情况下提供的数字结果确定 5 分钟的间隔。
无论哪种方式,您都不会在结果中得到 Date
对象,但您确实得到了可以轻松 "cast" 到 Date
对象中的东西。
如果您可以接受 "dataaggregation" 集合并使用复合 _id
字段代替 "t" 和 "pid" 以及提高效率。
只要您的服务器可以处理它,它就会 运行 比传输到客户端并写回数据库快得多。
补充一下,这里有一种使用 Bulk Operations API:
"cast" 所有字符串作为日期的方法var bulk = db.collection.initializeOrderdBulkOp();
var counter = 0;
db.collection.find().forEach(function(doc) {
bulk.find({ "_id": doc._id })
.updateOne({ "$set": { "isodate": new Date(doc.isodate) } });
counter++;
if( counter % 1000 == 0 ) {
bulk.execute();
bulk = db.collection.initializeOrderdBulkOp();
}
});
if ( counter % 1000 != 0 )
bulk.execute();
正如 Neil 所建议的,我建议您使用 Mongo 聚合。 如果你想聚合所有 5 分钟的数据,你可以使用:
db.data.aggregate([
{ "$group": {
"_id": {
"t": { $subtract: [{ $subtract: [ "$isodate", { $multiply: [{ $mod: [ {$minute:"$isodate"}, 5 ] }, 60*1000]} ] }, { $multiply: [{ $mod: [ {$second:"$isodate"}, 60 ] }, 1000]}]},
"pid": "$variableID"
},
"sum": { "$sum": "$value" },
"count": { "$sum": 1 },
"min": { "$min": "$value" },
"max": { "$max": "$value" }
}},
{ "$project": {
"_id": 0,
"t": "$_id.t",
"pid": "$_id.pid",
"sum": 1,
"count": 1,
"min": 1,
"max": 1
}},
{ "$out": "dataaggregation" }
],{ "allowDiskUse": true })