如何使用 Mongoid 重新减少以聚合来自两个不同字段的数据?
How to rereduce with Mongoid to aggrerate data from two distinct fields?
上下文
mongodb 的两个文档映射到 rails/mongoid classes。两个class是Task
和Subscription
。出于性能原因,Subscription.current_task
存储 Task::CurrentTask
,其中包含 Task
的属性子集,但与订阅匹配的实际当前任务是具有最高 Task#pos
的任务对于给定的 Task#subscription_id
.
问题
Subscription.current_task
的某些属性与应该匹配 Task
的属性之间出现了一些不一致,特别是 state
字段。
目标
列出 Subscription
中与此订阅的最后一个任务不匹配的所有当前任务。
解决方案目标
首先,map/reduce Task
为每个订阅获取最后一个并将其存储到临时集合中。第三,使用 Subscription
上的这个临时集合重新归约,为每个订阅获取一个对象,其中包含实际的最后一个任务和当前嵌入的子集副本。第三,为实际任务和复制任务不匹配的元素创建报告。
遇到困难
虽然阅读了 official mongodb and mangoid documentation, and other example in misc. blog like MongoDB Map Re-Reduce and joins – performance tuning and MongoDB, Mongoid, MapReduce and Embedded Documents.,但我仍然无法为 rereduce 步骤提供有效的解决方案。
到目前为止写的非功能性解决方案:
# map/reduce of tasks to get the last one of each subscripton
last_task_map = %Q{
function() {
var key = this.subscription_id;
var value = {
task: {
pos: this.pos,
task_id: this._id,
state: this.state
},
current_task: null
};
emit(key, value);
}
}
last_task_reduce = %Q{
function(key, tasks) {
var last_task = tasks[0];
for ( var i=1; i < tasks.length; i++ ) {
if(tasks[i].pos > last_task.pos) {
last_task = tasks[i];
}
}
var value = {
task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state},
current_task: null
};
return value;
}
}
# map/reduce of `current_task`s to merged with previous results
subscription_map = %Q{
function() {
if(!this.current_task) {
return;
}
var key = this._id;
var value = {
task: null,
current_task: {
pos: this.current_task.pos,
task_id: this.current_task.task_id,
state: this.current_task.state,
source: 'current_task',
}
};
emit(key, value);
};
}
reduce = %Q{
function(key, tasks) {
if(tasks[0].current_task == nill) {
return {task: tasks[0].task, current_task: tasks[1].current_task};
}
return {task: tasks[1].task, current_task: tasks[0].current_task};
}
}
buffer = 'current_task_consistency'
# temporary collection seems unremoved when serially calling the script with
# `load` in a `rails c` prompt, so we drop it to avoid unwanted glitch merge
Mongoid.default_client[buffer].drop
t = Task.map_reduce(last_task_map, last_task_reduce).out(replace: buffer)
s = Subscription.map_reduce(subscription_map, reduce).out(reduce: buffer)
t.each{ |e| puts e } # ok: `{"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0}, "current_task"=>nil}}`
puts t.counts # ok: {"input"=>83900, "emit"=>83900, "reduce"=>36115, "output"=>28625}
s.each{ |e| puts e } # ko: {"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>nil, "current_task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0, "source"=>"current_task"}}}
puts s.counts # ko: {"input"=>28632, "emit"=>28624, "reduce"=>0, "output"=>28624}
第二个 map/reduce 的预期结果是 current_task_consistency
和 subscription_map
结果的合并,当执行 none 时,这些结果都应该在 reduce 中通过根据 counts
,实际上 s
元素的输出显示没有 task
键被分配了 current_task_consistency
值。
与暴露问题相关的问题
- 实施有什么影响?
- 据我了解,这
解决方案确实提供了
merge
函数,这些函数是幂等的并提供输出
与对应的match
函数returns一致。我可以做什么
误解了 out
参数的工作原理以及
rereduce input/output 应该管理吗?
补充说明
第三步,即生成报告,旨在作为应用于第二个 map/reduce 的 finalize
函数来实现。但也许第三个 map/reduce 可能是更好的方法,也可能不是。总体而言,至少从性能的角度来看,实施的结构可能非常糟糕,也欢迎就这一点提供反馈。
提议的解决方案的第一个问题只是 ruby/js 语法混淆,nill
而不是 null
。不幸的是,脚本无声地失败了,至少在我 运行 load current_task_consistency.rb
.
的撬动控制台中是这样
这是一个有效的解决方案,有两个 map/reduce 和一个对生成的临时集合的查询。
# map/reduce of tasks to get the last one of each subscripton
last_task_map = %Q{
function() {
var key = this.subscription_id;
var value = {
task: {
pos: this.pos,
task_id: this._id,
state: this.state
},
current_task: null
};
emit(key, value);
}
}
last_task_reduce = %Q{
function(key, tasks) {
var last_task = tasks[0];
for ( var i=1; i < tasks.length; i++ ) {
if(tasks[i].pos > last_task.pos) {
last_task = tasks[i];
}
}
var value = {
task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state},
current_task: null
};
return value;
}
}
# map/reduce of `current_task`s merged side by side with the corresponding
# subscription last task
subscription_map = %Q{
function() {
if(!this.current_task) {
return;
}
var key = this._id;
var value = {
task: null,
current_task: {
pos: this.current_task.pos,
task_id: this.current_task.task_id,
state: this.current_task.state,
}
};
emit(key, value);
};
}
subscription_reduce = %Q{
function(key, tasks) {
if(tasks[0].current_task == null) {
return {task: tasks[0].task, current_task: tasks[1].current_task};
}
return {task: tasks[1].task, current_task: tasks[0].current_task};
}
}
buffer = 'current_task_consistency'
# temporary collection seems unremoved when serially calling the script with
# `load` in a `rails c` prompt, so we drop it to avoid unwanted merge glitch
Mongoid.default_client[buffer].drop
Task.map_reduce(last_task_map, last_task_reduce).
out(replace: buffer).
execute
Subscription.
map_reduce(subscription_map, subscription_reduce).
out(reduce: buffer).
execute
ascertain_inconsistency = %Q{
this.value.current_task == null ||
this.value.current_task.state != this.value.task.state
}
inconsistencies = Mongoid.default_client['current_task_consistency'].
find( { "$where": ascertain_inconsistency } )
上下文
mongodb 的两个文档映射到 rails/mongoid classes。两个class是Task
和Subscription
。出于性能原因,Subscription.current_task
存储 Task::CurrentTask
,其中包含 Task
的属性子集,但与订阅匹配的实际当前任务是具有最高 Task#pos
的任务对于给定的 Task#subscription_id
.
问题
Subscription.current_task
的某些属性与应该匹配 Task
的属性之间出现了一些不一致,特别是 state
字段。
目标
列出 Subscription
中与此订阅的最后一个任务不匹配的所有当前任务。
解决方案目标
首先,map/reduce Task
为每个订阅获取最后一个并将其存储到临时集合中。第三,使用 Subscription
上的这个临时集合重新归约,为每个订阅获取一个对象,其中包含实际的最后一个任务和当前嵌入的子集副本。第三,为实际任务和复制任务不匹配的元素创建报告。
遇到困难
虽然阅读了 official mongodb and mangoid documentation, and other example in misc. blog like MongoDB Map Re-Reduce and joins – performance tuning and MongoDB, Mongoid, MapReduce and Embedded Documents.,但我仍然无法为 rereduce 步骤提供有效的解决方案。
到目前为止写的非功能性解决方案:
# map/reduce of tasks to get the last one of each subscripton
last_task_map = %Q{
function() {
var key = this.subscription_id;
var value = {
task: {
pos: this.pos,
task_id: this._id,
state: this.state
},
current_task: null
};
emit(key, value);
}
}
last_task_reduce = %Q{
function(key, tasks) {
var last_task = tasks[0];
for ( var i=1; i < tasks.length; i++ ) {
if(tasks[i].pos > last_task.pos) {
last_task = tasks[i];
}
}
var value = {
task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state},
current_task: null
};
return value;
}
}
# map/reduce of `current_task`s to merged with previous results
subscription_map = %Q{
function() {
if(!this.current_task) {
return;
}
var key = this._id;
var value = {
task: null,
current_task: {
pos: this.current_task.pos,
task_id: this.current_task.task_id,
state: this.current_task.state,
source: 'current_task',
}
};
emit(key, value);
};
}
reduce = %Q{
function(key, tasks) {
if(tasks[0].current_task == nill) {
return {task: tasks[0].task, current_task: tasks[1].current_task};
}
return {task: tasks[1].task, current_task: tasks[0].current_task};
}
}
buffer = 'current_task_consistency'
# temporary collection seems unremoved when serially calling the script with
# `load` in a `rails c` prompt, so we drop it to avoid unwanted glitch merge
Mongoid.default_client[buffer].drop
t = Task.map_reduce(last_task_map, last_task_reduce).out(replace: buffer)
s = Subscription.map_reduce(subscription_map, reduce).out(reduce: buffer)
t.each{ |e| puts e } # ok: `{"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0}, "current_task"=>nil}}`
puts t.counts # ok: {"input"=>83900, "emit"=>83900, "reduce"=>36115, "output"=>28625}
s.each{ |e| puts e } # ko: {"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>nil, "current_task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0, "source"=>"current_task"}}}
puts s.counts # ko: {"input"=>28632, "emit"=>28624, "reduce"=>0, "output"=>28624}
第二个 map/reduce 的预期结果是 current_task_consistency
和 subscription_map
结果的合并,当执行 none 时,这些结果都应该在 reduce 中通过根据 counts
,实际上 s
元素的输出显示没有 task
键被分配了 current_task_consistency
值。
与暴露问题相关的问题
- 实施有什么影响?
- 据我了解,这
解决方案确实提供了
merge
函数,这些函数是幂等的并提供输出 与对应的match
函数returns一致。我可以做什么 误解了out
参数的工作原理以及 rereduce input/output 应该管理吗?
补充说明
第三步,即生成报告,旨在作为应用于第二个 map/reduce 的 finalize
函数来实现。但也许第三个 map/reduce 可能是更好的方法,也可能不是。总体而言,至少从性能的角度来看,实施的结构可能非常糟糕,也欢迎就这一点提供反馈。
提议的解决方案的第一个问题只是 ruby/js 语法混淆,nill
而不是 null
。不幸的是,脚本无声地失败了,至少在我 运行 load current_task_consistency.rb
.
这是一个有效的解决方案,有两个 map/reduce 和一个对生成的临时集合的查询。
# map/reduce of tasks to get the last one of each subscripton
last_task_map = %Q{
function() {
var key = this.subscription_id;
var value = {
task: {
pos: this.pos,
task_id: this._id,
state: this.state
},
current_task: null
};
emit(key, value);
}
}
last_task_reduce = %Q{
function(key, tasks) {
var last_task = tasks[0];
for ( var i=1; i < tasks.length; i++ ) {
if(tasks[i].pos > last_task.pos) {
last_task = tasks[i];
}
}
var value = {
task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state},
current_task: null
};
return value;
}
}
# map/reduce of `current_task`s merged side by side with the corresponding
# subscription last task
subscription_map = %Q{
function() {
if(!this.current_task) {
return;
}
var key = this._id;
var value = {
task: null,
current_task: {
pos: this.current_task.pos,
task_id: this.current_task.task_id,
state: this.current_task.state,
}
};
emit(key, value);
};
}
subscription_reduce = %Q{
function(key, tasks) {
if(tasks[0].current_task == null) {
return {task: tasks[0].task, current_task: tasks[1].current_task};
}
return {task: tasks[1].task, current_task: tasks[0].current_task};
}
}
buffer = 'current_task_consistency'
# temporary collection seems unremoved when serially calling the script with
# `load` in a `rails c` prompt, so we drop it to avoid unwanted merge glitch
Mongoid.default_client[buffer].drop
Task.map_reduce(last_task_map, last_task_reduce).
out(replace: buffer).
execute
Subscription.
map_reduce(subscription_map, subscription_reduce).
out(reduce: buffer).
execute
ascertain_inconsistency = %Q{
this.value.current_task == null ||
this.value.current_task.state != this.value.task.state
}
inconsistencies = Mongoid.default_client['current_task_consistency'].
find( { "$where": ascertain_inconsistency } )