您如何始终如一地迁移大型 MongoDB 集合?
How do you consistently migrate a large MongoDB collection?
我正尝试迁移大量 MongoDB 约 600k 文档,如下所示:
for await (const doc of db.collection('collection').find({
legacyProp: { $exists: true },
})) {
// additional data fetching from separate collections here
const newPropValue = await fetchNewPropValue(doc._id)
await db.collection('collection').findOneAndUpdate({ _id: doc._id }, [{ $set: { newProp: newPropValue } }, { $unset: ['legacyProp'] }])
}
}
迁移脚本完成后,数据仍在更新约 30 分钟左右。我通过计算包含 legacyProp
属性:
的文档的文档数得出结论
db.collection.countDocuments({ legacyProp: { $exists: true } })
在随后的调用中不断减少。一段时间后,更新停止,包含 legacy prop 的文档的最终文档数约为 300k,因此更新无提示地失败,导致数据丢失。我很好奇到底发生了什么,最重要的是,如何在不丢失任何数据的情况下更新大型 MongoDB 集合?请记住,在每次更新操作之前都涉及额外的数据提取。
我的第一次尝试是在聚合管道中构建 fetchNewPropValue()
的函数。
看看Aggregation Pipeline Operators
如果这不可能,那么您可以尝试将所有 newPropValue 放入数组中并像这样使用它。 60 万个属性应该很容易放入您的 RAM。
const newPropValues = await fetchNewPropValue() // getting all new properties as array [{_id: ..., val: ...}, {_id: ..., val: ...}, ...]
db.getCollection('collection').updateMany(
{ legacyProp: { $exists: true } },
[
{
$set: {
newProp: {
$first: {
$filter: { input: newPropValues, cond: { $eq: ["$_id", "$$this._id"] } }
}
}
}
},
{ $set: { legacyProp: "$$REMOVE", newProp: "$$newProp.val" } }
]
)
或者您可以尝试批量写入:
let bulkOperations = []
db.getCollection('collection').find({ legacyProp: { $exists: true } }).forEach(doc => {
const newPropValue = await fetchNewPropValue(doc._id);
bulkOperations.push({
updateOne: {
filter: { _id: doc._id },
update: {
$set: { newProp: newPropValue },
$unset: { legacyProp: "" }
}
}
});
if (bulkOperations.length > 10000) {
db.getCollection('collection').bulkWrite(bulkOperations, { ordered: false });
bulkOperations = [];
}
})
if (bulkOperations.length > 0)
db.getCollection('collection').bulkWrite(bulkOperations, { ordered: false })
我正尝试迁移大量 MongoDB 约 600k 文档,如下所示:
for await (const doc of db.collection('collection').find({
legacyProp: { $exists: true },
})) {
// additional data fetching from separate collections here
const newPropValue = await fetchNewPropValue(doc._id)
await db.collection('collection').findOneAndUpdate({ _id: doc._id }, [{ $set: { newProp: newPropValue } }, { $unset: ['legacyProp'] }])
}
}
迁移脚本完成后,数据仍在更新约 30 分钟左右。我通过计算包含 legacyProp
属性:
db.collection.countDocuments({ legacyProp: { $exists: true } })
在随后的调用中不断减少。一段时间后,更新停止,包含 legacy prop 的文档的最终文档数约为 300k,因此更新无提示地失败,导致数据丢失。我很好奇到底发生了什么,最重要的是,如何在不丢失任何数据的情况下更新大型 MongoDB 集合?请记住,在每次更新操作之前都涉及额外的数据提取。
我的第一次尝试是在聚合管道中构建 fetchNewPropValue()
的函数。
看看Aggregation Pipeline Operators
如果这不可能,那么您可以尝试将所有 newPropValue 放入数组中并像这样使用它。 60 万个属性应该很容易放入您的 RAM。
const newPropValues = await fetchNewPropValue() // getting all new properties as array [{_id: ..., val: ...}, {_id: ..., val: ...}, ...]
db.getCollection('collection').updateMany(
{ legacyProp: { $exists: true } },
[
{
$set: {
newProp: {
$first: {
$filter: { input: newPropValues, cond: { $eq: ["$_id", "$$this._id"] } }
}
}
}
},
{ $set: { legacyProp: "$$REMOVE", newProp: "$$newProp.val" } }
]
)
或者您可以尝试批量写入:
let bulkOperations = []
db.getCollection('collection').find({ legacyProp: { $exists: true } }).forEach(doc => {
const newPropValue = await fetchNewPropValue(doc._id);
bulkOperations.push({
updateOne: {
filter: { _id: doc._id },
update: {
$set: { newProp: newPropValue },
$unset: { legacyProp: "" }
}
}
});
if (bulkOperations.length > 10000) {
db.getCollection('collection').bulkWrite(bulkOperations, { ordered: false });
bulkOperations = [];
}
})
if (bulkOperations.length > 0)
db.getCollection('collection').bulkWrite(bulkOperations, { ordered: false })