如何在删除文档时从 MongoDB changeStream 获取 fullDocument?
How to get fullDocument from MongoDB changeStream when a document is deleted?
我的代码
我有一个包含两个集合的 MongoDB,Items
和 Calculations
。
Items
value: Number
date: Date
Calculations
calculation: Number
start_date: Date
end_date: Date
A Calculation
是基于数据库中所有 Items
的 Item
值的存储计算,其日期介于 Calculation
的开始日期和结束日期。
Mongo 更改流
我认为创建/更新 Calculations
的一个好方法是在 Items
集合上创建一个 Mongo 更改流,它监听 Items
集合的更改然后重新计算相关的 Calculations
.
问题是,根据 Mongo Change Event docs,删除文档时,会省略 fullDocument
字段,这会阻止我访问已删除的 Item
的日期会通知哪些 Calculations
应该更新。
问题
有什么方法可以访问由于文档删除而触发的 Mongo 更改事件的 fullDocument
?
不,我不相信有办法。来自 https://docs.mongodb.com/manual/changeStreams/#event-notification:
Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set.
当文档被删除并且删除在副本集中的大多数节点上持续存在时,文档不再存在于副本集中。因此,变更流不能 return 不再存在的东西。
您的问题的解决方案是 transactions in MongoDB 4.0。也就是说,您可以在单个原子操作中调整 Calculations
并删除相应的 Items
。
fullDocument
删除文档不返回
但有一个解决方法。
在删除文档之前,设置一个提示字段。 (显然使用不与您当前属性冲突的名称。)
await myCollection.updateOne({_id:theId}, {_deleting: true})
await myCollection.deleteOne({_id:theId})
这将触发流中的最后一个 change
事件,并提示文档正在被删除。然后在你的流观察器中,你简单地检查这个值。
stream.on('change', event => {
if (!event.fullDocument) {
// The document was deleted
}
else if (event.fullDocument._deleting) {
// The document is going to be deleted
}
else {
// Document created or updated
}
})
我的 oplog 更新速度不够快,更新正在查找已删除的文档,因此我需要添加一个小的延迟才能使其正常工作。
myCollection.updateOne({_id:theId}, {_deleting: true})
setTimeout( ()=> {
myCollection.deleteOne({_id:theId})
}, 100)
如果文档最初不存在,则不会更新或删除,因此不会触发任何内容。
使用 TTL 索引
实现此功能的另一种方法是添加一个 ttl 索引,然后将该字段设置为当前时间。这将首先触发更新,然后自动删除文档。
myCollection.setIndex({_deleting:1}, {expireAfterSeconds:0})
myCollection.updateOne({_id:theId}, {$set:{_deleting:new Date()}})
这种方法的问题是 mongo 仅在特定时间间隔内修剪 TTL 文档,如文档中所述,60 秒或更长时间,所以我更喜欢使用第一种方法。
我的代码
我有一个包含两个集合的 MongoDB,Items
和 Calculations
。
Items
value: Number
date: Date
Calculations
calculation: Number
start_date: Date
end_date: Date
A Calculation
是基于数据库中所有 Items
的 Item
值的存储计算,其日期介于 Calculation
的开始日期和结束日期。
Mongo 更改流
我认为创建/更新 Calculations
的一个好方法是在 Items
集合上创建一个 Mongo 更改流,它监听 Items
集合的更改然后重新计算相关的 Calculations
.
问题是,根据 Mongo Change Event docs,删除文档时,会省略 fullDocument
字段,这会阻止我访问已删除的 Item
的日期会通知哪些 Calculations
应该更新。
问题
有什么方法可以访问由于文档删除而触发的 Mongo 更改事件的 fullDocument
?
不,我不相信有办法。来自 https://docs.mongodb.com/manual/changeStreams/#event-notification:
Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set.
当文档被删除并且删除在副本集中的大多数节点上持续存在时,文档不再存在于副本集中。因此,变更流不能 return 不再存在的东西。
您的问题的解决方案是 transactions in MongoDB 4.0。也就是说,您可以在单个原子操作中调整 Calculations
并删除相应的 Items
。
fullDocument
删除文档不返回
但有一个解决方法。
在删除文档之前,设置一个提示字段。 (显然使用不与您当前属性冲突的名称。)
await myCollection.updateOne({_id:theId}, {_deleting: true})
await myCollection.deleteOne({_id:theId})
这将触发流中的最后一个 change
事件,并提示文档正在被删除。然后在你的流观察器中,你简单地检查这个值。
stream.on('change', event => {
if (!event.fullDocument) {
// The document was deleted
}
else if (event.fullDocument._deleting) {
// The document is going to be deleted
}
else {
// Document created or updated
}
})
我的 oplog 更新速度不够快,更新正在查找已删除的文档,因此我需要添加一个小的延迟才能使其正常工作。
myCollection.updateOne({_id:theId}, {_deleting: true})
setTimeout( ()=> {
myCollection.deleteOne({_id:theId})
}, 100)
如果文档最初不存在,则不会更新或删除,因此不会触发任何内容。
使用 TTL 索引
实现此功能的另一种方法是添加一个 ttl 索引,然后将该字段设置为当前时间。这将首先触发更新,然后自动删除文档。
myCollection.setIndex({_deleting:1}, {expireAfterSeconds:0})
myCollection.updateOne({_id:theId}, {$set:{_deleting:new Date()}})
这种方法的问题是 mongo 仅在特定时间间隔内修剪 TTL 文档,如文档中所述,60 秒或更长时间,所以我更喜欢使用第一种方法。