如何使用 Mongoid 重新减少以聚合来自两个不同字段的数据?

How to rereduce with Mongoid to aggrerate data from two distinct fields?

上下文

mongodb 的两个文档映射到 rails/mongoid classes。两个class是TaskSubscription。出于性能原因,Subscription.current_task 存储 Task::CurrentTask,其中包含 Task 的属性子集,但与订阅匹配的实际当前任务是具有最高 Task#pos 的任务对于给定的 Task#subscription_id.

问题

Subscription.current_task 的某些属性与应该匹配 Task 的属性之间出现了一些不一致,特别是 state 字段。

目标

列出 Subscription 中与此订阅的最后一个任务不匹配的所有当前任务。

解决方案目标

首先,map/reduce Task 为每个订阅获取最后一个并将其存储到临时集合中。第三,使用 Subscription 上的这个临时集合重新归约,为每个订阅获取一个对象,其中包含实际的最后一个任务和当前嵌入的子集副本。第三,为实际任务和复制任务不匹配的元素创建报告。

遇到困难

虽然阅读了 official mongodb and mangoid documentation, and other example in misc. blog like MongoDB Map Re-Reduce and joins – performance tuning and MongoDB, Mongoid, MapReduce and Embedded Documents.,但我仍然无法为 rereduce 步骤提供有效的解决方案。

到目前为止写的非功能性解决方案:

# map/reduce of tasks to get the last one of each subscripton
last_task_map = %Q{
  function() {
    var key = this.subscription_id;
    var value = {
        task: {
          pos: this.pos,
          task_id: this._id,
          state: this.state
        },
        current_task: null
    };
    emit(key, value);
  }
}
last_task_reduce = %Q{
  function(key, tasks) {
    var last_task = tasks[0];
    for ( var i=1; i < tasks.length; i++ ) {
      if(tasks[i].pos > last_task.pos) {
        last_task = tasks[i];
      }
    }
    
    var value = {
      task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state},
      current_task: null
    };
    return value;
  }
}

# map/reduce of `current_task`s to merged with previous results
subscription_map = %Q{
  function() {
    if(!this.current_task) {
      return;
    }
    var key = this._id;
    var value = {
      task: null,
      current_task: {
        pos: this.current_task.pos,
        task_id: this.current_task.task_id,
        state: this.current_task.state,
        source: 'current_task',
      }
    };
    emit(key, value);
  };
}

reduce = %Q{
  function(key, tasks) {
    if(tasks[0].current_task == nill) {
      return {task: tasks[0].task, current_task: tasks[1].current_task};
    }
    return {task: tasks[1].task, current_task: tasks[0].current_task};
  }
}


buffer = 'current_task_consistency'
# temporary collection seems unremoved when serially calling the script with 
# `load` in a `rails c` prompt, so we drop it to avoid unwanted glitch merge
Mongoid.default_client[buffer].drop
t = Task.map_reduce(last_task_map, last_task_reduce).out(replace: buffer)
s = Subscription.map_reduce(subscription_map, reduce).out(reduce: buffer)
t.each{ |e| puts e } # ok: `{"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0}, "current_task"=>nil}}`
puts t.counts # ok: {"input"=>83900, "emit"=>83900, "reduce"=>36115, "output"=>28625}
s.each{ |e| puts e } # ko: {"_id"=>BSON::ObjectId('592dd603e138236671587b04'), "value"=>{"task"=>nil, "current_task"=>{"pos"=>0.0, "task_id"=>BSON::ObjectId('592dd604e138236671587b0f'), "state"=>40.0, "source"=>"current_task"}}}
puts s.counts # ko: {"input"=>28632, "emit"=>28624, "reduce"=>0, "output"=>28624}

第二个 map/reduce 的预期结果是 current_task_consistencysubscription_map 结果的合并,当执行 none 时,这些结果都应该在 reduce 中通过根据 counts,实际上 s 元素的输出显示没有 task 键被分配了 current_task_consistency 值。

与暴露问题相关的问题

补充说明

第三步,即生成报告,旨在作为应用于第二个 map/reduce 的 finalize 函数来实现。但也许第三个 map/reduce 可能是更好的方法,也可能不是。总体而言,至少从性能的角度来看,实施的结构可能非常糟糕,也欢迎就这一点提供反馈。

提议的解决方案的第一个问题只是 ruby/js 语法混淆,nill 而不是 null。不幸的是,脚本无声地失败了,至少在我 运行 load current_task_consistency.rb.

的撬动控制台中是这样

这是一个有效的解决方案,有两个 map/reduce 和一个对生成的临时集合的查询。

# map/reduce of tasks to get the last one of each subscripton
last_task_map = %Q{
  function() {
    var key = this.subscription_id;
    var value = {
        task: {
          pos: this.pos,
          task_id: this._id,
          state: this.state
        },
        current_task: null
    };
    emit(key, value);
  }
}
last_task_reduce = %Q{
  function(key, tasks) {
    var last_task = tasks[0];
    for ( var i=1; i < tasks.length; i++ ) {
      if(tasks[i].pos > last_task.pos) {
        last_task = tasks[i];
      }
    }

    var value = {
      task: {pos: last_task.pos, task_id: last_task.task_id, state: last_task.state},
      current_task: null
    };
    return value;
  }
}

# map/reduce of `current_task`s merged side by side with the corresponding
# subscription last task 
subscription_map = %Q{
  function() {
    if(!this.current_task) {
      return;
    }
    var key = this._id;
    var value = {
      task: null,
      current_task: {
        pos: this.current_task.pos,
        task_id: this.current_task.task_id,
        state: this.current_task.state,
      }
    };
    emit(key, value);
  };
}

subscription_reduce = %Q{
  function(key, tasks) {
    if(tasks[0].current_task == null) {
      return {task: tasks[0].task, current_task: tasks[1].current_task};
    }
    return {task: tasks[1].task, current_task: tasks[0].current_task};
  }
}

buffer = 'current_task_consistency'
# temporary collection seems unremoved when serially calling the script with 
# `load` in a `rails c` prompt, so we drop it to avoid unwanted merge glitch
Mongoid.default_client[buffer].drop

Task.map_reduce(last_task_map, last_task_reduce).
  out(replace: buffer).
  execute

Subscription.
  map_reduce(subscription_map, subscription_reduce).
  out(reduce: buffer).
  execute

ascertain_inconsistency = %Q{
  this.value.current_task == null ||
    this.value.current_task.state != this.value.task.state
}

inconsistencies = Mongoid.default_client['current_task_consistency'].
  find( { "$where": ascertain_inconsistency } )