您如何始终如一地迁移大型 MongoDB 集合?

How do you consistently migrate a large MongoDB collection?

我正尝试迁移大量 MongoDB 约 600k 文档,如下所示:

    for await (const doc of db.collection('collection').find({
        legacyProp: { $exists: true },
    })) {
        // additional data fetching from separate collections here
        const newPropValue = await fetchNewPropValue(doc._id)
        await db.collection('collection').findOneAndUpdate({ _id: doc._id }, [{ $set: { newProp: newPropValue } }, { $unset: ['legacyProp'] }])
    }
}

迁移脚本完成后,数据仍在更新约 30 分钟左右。我通过计算包含 legacyProp 属性:

的文档的文档数得出结论
db.collection.countDocuments({ legacyProp: { $exists: true } })

在随后的调用中不断减少。一段时间后,更新停止,包含 legacy prop 的文档的最终文档数约为 300k,因此更新无提示地失败,导致数据丢失。我很好奇到底发生了什么,最重要的是,如何在不丢失任何数据的情况下更新大型 MongoDB 集合?请记住,在每次更新操作之前都涉及额外的数据提取。

我的第一次尝试是在聚合管道中构建 fetchNewPropValue() 的函数。

看看Aggregation Pipeline Operators

如果这不可能,那么您可以尝试将所有 newPropValue 放入数组中并像这样使用它。 60 万个属性应该很容易放入您的 RAM。

const newPropValues = await fetchNewPropValue() // getting all new properties as array [{_id: ..., val: ...}, {_id: ..., val: ...}, ...]
db.getCollection('collection').updateMany(
   { legacyProp: { $exists: true } },
   [
      {
         $set: {
            newProp: {
               $first: {
                  $filter: { input: newPropValues, cond: { $eq: ["$_id", "$$this._id"] } }
               }
            }
         }
      },
      { $set: { legacyProp: "$$REMOVE", newProp: "$$newProp.val" } }
   ]
)

或者您可以尝试批量写入:

let bulkOperations = []
db.getCollection('collection').find({ legacyProp: { $exists: true } }).forEach(doc => {
   const newPropValue = await fetchNewPropValue(doc._id);
   bulkOperations.push({
      updateOne: {
         filter: { _id: doc._id },
         update: {
            $set: { newProp: newPropValue },
            $unset: { legacyProp: "" }
         }
      }
   });
   if (bulkOperations.length > 10000) {
      db.getCollection('collection').bulkWrite(bulkOperations, { ordered: false });
      bulkOperations = [];
   }
})
if (bulkOperations.length > 0)
   db.getCollection('collection').bulkWrite(bulkOperations, { ordered: false })