高级 MongoDB upsert

Advanced MongoDB upsert

假设我们有 3 个服务并行工作并写入我们的 MongoDB 存储。他们创建记录,其中包含以下信息:

{
   guid: GUID,
   ts: Timestamp,
   data: Object
}

因为 MongoDB 存储在某一时刻对于特定的 GUID 应该是空的,或者是最新的(最大 ts)记录。

小例子:

  1. 1 使用 {guid: 1, ts: 10, data: {}} 调用 - 插入 {guid: 1, ts: 10, data: {}}
  2. 2 调用 {guid: 1, ts: 5, data: {}} - 没有更新
  3. 3 使用 {guid: 1, ts: 15, data: {}} 调用 - 使用 ts: 15 和新数据更新文档。

换句话说,如果没有提供GUID的文档,我们必须插入记录,并在这样的guid已经存在并且ts大于现有记录的情况下更新记录。不要更新记录,如果 ts 小于现有记录。

我知道这是某种更新插入操作,但我无法想象如何处理它。尝试使用 findAndModify, mapReduce or $max update 运算符,但没有运气 atm。提前谢谢你。

我们需要建模的逻辑分为三个部分。给定一个新文档

var newDoc = { "guid" : 1, "ts" : 10, "data" : "asdf" }
  1. 没有 guidnewDoc 相同的文档 - 插入 newDoc
  2. 有一个文档 oldDocnewDoc 具有相同的 guid 并且 oldDoc.ts < newDoc.ts - newDoc 覆盖 oldDoc
  3. 有一个文档 oldDocnewDocoldDoc.ts > newDoc.ts 具有相同的 guid - newDoc 无效

如果 ts 值相等,我想我们不关心保留 oldDocnewDoc 中的哪一个。

没有一个步骤可以处理所有这些逻辑。我们可以使用一个更新来处理第一个条件和第二个或第三个条件,但是一起处理这三个条件需要多个步骤。 shell 的示例代码,假设 { "guid" : 1 } 上的唯一索引:

// just try to insert
var wr = db.test.insert(newDoc)
if (wr.getWriteError() && wr.getWriteError.code === 11000) {
    // this is case 2 or 3 - 11000 is duplicate key error
    // so update if case 2
    db.test.update({ "guid" : newDoc.guid, "ts" : { "$lte" : newDoc.ts } }, newDoc)
}
else {
    // this is case 1 - newDoc was inserted if there weren't other errors
}

我认为这个顺序适用于并发请求。如果我们假设我们有两个工人 Alice 和 Bob 想要使用相同的 guid,那么,如果 guid 不存在,Alice 和 Bob 中的一个将首先执行插入,另一个将执行插入收到索引错误。两者都将以某种顺序 运行 第二次更新,无论如何,较高的时间戳将获胜。如果 guid 确实存在,我们将减少到它不存在时的第二部分。我认为我们在所有情况下都可以,但并发很难,所以你应该自己考虑一下。并进行测试。

重要的是这些更新只针对一个文档 - 我依赖 insert/update 一个文档是原子的。

如果我们在 guid 字段上稍微调整 wdberkeley 的 . If we create an unique index 并使用此查询,这可以在单个原子操作中完成:

db.guids.replaceOne({guid : newDoc.guid, ts : {$lte : newDoc.ts}}, newDoc, {upsert:true});

现在三种情况是这样的:

  1. 没有与newDoc具有相同guid的文档 - 插入newDoc

  2. 有一个文档 oldDocnewDoc 具有相同的 guid 并且 oldDoc.ts < newDoc.ts - newDoc 覆盖 oldDoc

  3. 有一个文档 oldDocnewDocoldDoc.ts > newDoc.ts 具有相同的 guid - 插入 (upsert) 将失败并出现我们可以忽略的错误:

WriteError({
        "index" : 0,
        "code" : 11000,
        "errmsg" : "E11000 duplicate key error collection: test.guids index: guid_1 dup key: { : 1.0 }",
        "op" : {
            "q" : {
                "guid" : 1,
                "ts" : {
                    "$lte" : 14
                }
            },
            "u" : {
                "guid" : 1,
                "ts" : 14,
                "data" : "asdf"
            },
            "multi" : false,
            "upsert" : true
        }
    })