如何监听 mongodb 变化流中嵌套数组特定字段的变化?
How to listen to change in specific field of a nested array in mongodb change streams?
这是我在 mongodb.
中的 BSON 文档的结构
{
"tournament_id": "P1oi12mwj10b1b",
"matches": [
{
"date_order": 1,
"matches": [
{
"match_id": "1A4i0sp34"
"time_order": 1,
"win": "team1",
"team1": "bar",
"team2": "psg"
},
{
"match_id": "3A4j0sp26"
"time_order": 2,
"win": "",
"team1": "rma",
"team2": "sev"
}
]
},
{
"date_order": 2,
"matches": [
{
"match_id": "2B4k0sp29"
"time_order": 1,
"win": "",
"team1": "manU",
"team2": "manC"
},
{
"match_id": "4A4i0sp31"
"time_order": 2,
"win": "",
"team1": "chelsea",
"team2": "arsenal"
}
]
}
]
}
我想制作一个通知系统,在比赛结束时发送通知。换句话说,每当 win
字段的值发生变化时,我想捕获更新的匹配项。我正在使用 mongodb 更改流。
例如,如果匹配 match_id 3A4j0sp26 刚刚完成,我想打印那个对象。
{
"match_id": "3A4j0sp26"
"time_order": 2,
"win": "team2",
"team1": "rma",
"team2": "sev"
# If possible I also want to find these fields,
"tournament_id": "P1oi12mwj10b1b",
"date_order": 1
}
我试过这样做。
import pymongo
from bson.json_util import dumps
MONGO_URI = 'mongodb://localhost/mydb'
client = pymongo.MongoClient(MONGO_URI)
filters = [] # How to correctly set this filter ???
'''
What I already tried but failed
filters = [{
'$match': {
'$and': [
{'updateDescription.updatedFields.matches': {'$exists': 'true'}}, # This line needs fixing.
{'operationType': {'$in': ['replace', 'update']}}
]
}
}]
'''
change_stream = client.mydb.match.watch(filters)
for change in change_streams:
print(dumps(change))
我尝试在不应用过滤器的情况下进行调试。我将 match_id
3A4j0sp26 的 win
字段更新为 team2。
我得到了这个结果。
{
"_id": {
"_data": "8261252C2F000000012B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
},
"operationType": "replace",
"clusterTime": {
"$timestamp": {
"t": 1629826095,
"i": 1
}
},
"fullDocument": {
"_id": {
"$oid": "61228ae88cf6743d054b8cef"
},
"tournament_id": "P1oi12mwj10b1b",
"matches": [
{
"date_order": 1,
"matches": [
{
"match_id": "1A4i0sp34"
"time_order": 1,
"win": "team1", # This was updated earlier. I don't want this.
"team1": "bar",
"team2": "psg"
},
{
"match_id": "3A4j0sp26"
"time_order": 2,
"win": "team1", # This is the most recently updated.
"team1": "rma",
"team2": "sev"
}
]
}]
}
}
它显示数组中的所有元素,而不是刚刚更新的元素。
已编辑
仅更新“分数”字段后得到的结果。
{
"_id": {
"_data": "8261254598000000022B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
},
"operationType": "update",
"clusterTime": {
"$timestamp": {
"t": 1629832600,
"i": 2
}
},
"ns": {
"db": "mydb",
"coll": "match"
},
"documentKey": {
"_id": {
"$oid": "61228ae88cf6743d054b8cef"
}
},
"updateDescription": {
"updatedFields": {
"matches": [
{
"date_order": 1,
"matches": [
{
"match_id": "1A4i0sp34"
"time_order": 1,
"win": "team1",
"team1": "bar",
"team2": "psg"
},
{
"match_id": "3A4j0sp26"
"time_order": 2,
"win": "team1",
"team1": "rma",
"team2": "sev"
}
]
}]
},
"removedFields": []
}
}
}
这取决于您如何进行更新。
一个简短的测试来证明:
插入文档并开始更改流
PRIMARY> db.updtest.insert({list:[
{item:"1",state:"running"},
{item:"2",state:"done"},
{item:"3",state:"unknown"}
]});
WriteResult({ "nInserted" : 1 })
PRIMARY> let stream = db.updtest.watch()
通过设置列表字段进行更新会导致 return 整个数组的更改事件:
PRIMARY> db.updtest.updateOne({},{$set:{list:[
{item:"1",state:"running"},
{item:"2",state:"done"},
{item:"3",state:"running"}
]}});
{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
PRIMARY> stream.next();
{
"_id" : {
"_data" : "82612577BE000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
},
"operationType" : "update",
"clusterTime" : Timestamp(1629845438, 1),
"ns" : {
"db" : "test",
"coll" : "updtest"
},
"documentKey" : {
"_id" : ObjectId("6125779d98787c286c544305")
},
"updateDescription" : {
"updatedFields" : {
"list" : [
{
"item" : "1",
"state" : "running"
},
{
"item" : "2",
"state" : "done"
},
{
"item" : "3",
"state" : "running"
}
]
},
"removedFields" : [ ]
}
}
仅更新一个子文档中的一个字段会导致仅包含已修改字段的更改事件:
PRIMARY> db.updtest.update({"list.item":"3"},{$set:{"list.$.state":"done"}});
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
PRIMARY> stream.next();
{
"_id" : {
"_data" : "8261257879000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
},
"operationType" : "update",
"clusterTime" : Timestamp(1629845625, 1),
"ns" : {
"db" : "test",
"coll" : "updtest"
},
"documentKey" : {
"_id" : ObjectId("6125779d98787c286c544305")
},
"updateDescription" : {
"updatedFields" : {
"list.2.state" : "done"
},
"removedFields" : [ ]
}
}
如果您还使用更改流选项 return 完整文档,您将拥有更改字段周围的上下文。
这是我在 mongodb.
中的 BSON 文档的结构{
"tournament_id": "P1oi12mwj10b1b",
"matches": [
{
"date_order": 1,
"matches": [
{
"match_id": "1A4i0sp34"
"time_order": 1,
"win": "team1",
"team1": "bar",
"team2": "psg"
},
{
"match_id": "3A4j0sp26"
"time_order": 2,
"win": "",
"team1": "rma",
"team2": "sev"
}
]
},
{
"date_order": 2,
"matches": [
{
"match_id": "2B4k0sp29"
"time_order": 1,
"win": "",
"team1": "manU",
"team2": "manC"
},
{
"match_id": "4A4i0sp31"
"time_order": 2,
"win": "",
"team1": "chelsea",
"team2": "arsenal"
}
]
}
]
}
我想制作一个通知系统,在比赛结束时发送通知。换句话说,每当 win
字段的值发生变化时,我想捕获更新的匹配项。我正在使用 mongodb 更改流。
例如,如果匹配 match_id 3A4j0sp26 刚刚完成,我想打印那个对象。
{
"match_id": "3A4j0sp26"
"time_order": 2,
"win": "team2",
"team1": "rma",
"team2": "sev"
# If possible I also want to find these fields,
"tournament_id": "P1oi12mwj10b1b",
"date_order": 1
}
我试过这样做。
import pymongo
from bson.json_util import dumps
MONGO_URI = 'mongodb://localhost/mydb'
client = pymongo.MongoClient(MONGO_URI)
filters = [] # How to correctly set this filter ???
'''
What I already tried but failed
filters = [{
'$match': {
'$and': [
{'updateDescription.updatedFields.matches': {'$exists': 'true'}}, # This line needs fixing.
{'operationType': {'$in': ['replace', 'update']}}
]
}
}]
'''
change_stream = client.mydb.match.watch(filters)
for change in change_streams:
print(dumps(change))
我尝试在不应用过滤器的情况下进行调试。我将 match_id
3A4j0sp26 的 win
字段更新为 team2。
我得到了这个结果。
{
"_id": {
"_data": "8261252C2F000000012B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
},
"operationType": "replace",
"clusterTime": {
"$timestamp": {
"t": 1629826095,
"i": 1
}
},
"fullDocument": {
"_id": {
"$oid": "61228ae88cf6743d054b8cef"
},
"tournament_id": "P1oi12mwj10b1b",
"matches": [
{
"date_order": 1,
"matches": [
{
"match_id": "1A4i0sp34"
"time_order": 1,
"win": "team1", # This was updated earlier. I don't want this.
"team1": "bar",
"team2": "psg"
},
{
"match_id": "3A4j0sp26"
"time_order": 2,
"win": "team1", # This is the most recently updated.
"team1": "rma",
"team2": "sev"
}
]
}]
}
}
它显示数组中的所有元素,而不是刚刚更新的元素。
已编辑
仅更新“分数”字段后得到的结果。
{
"_id": {
"_data": "8261254598000000022B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
},
"operationType": "update",
"clusterTime": {
"$timestamp": {
"t": 1629832600,
"i": 2
}
},
"ns": {
"db": "mydb",
"coll": "match"
},
"documentKey": {
"_id": {
"$oid": "61228ae88cf6743d054b8cef"
}
},
"updateDescription": {
"updatedFields": {
"matches": [
{
"date_order": 1,
"matches": [
{
"match_id": "1A4i0sp34"
"time_order": 1,
"win": "team1",
"team1": "bar",
"team2": "psg"
},
{
"match_id": "3A4j0sp26"
"time_order": 2,
"win": "team1",
"team1": "rma",
"team2": "sev"
}
]
}]
},
"removedFields": []
}
}
}
这取决于您如何进行更新。
一个简短的测试来证明:
插入文档并开始更改流
PRIMARY> db.updtest.insert({list:[
{item:"1",state:"running"},
{item:"2",state:"done"},
{item:"3",state:"unknown"}
]});
WriteResult({ "nInserted" : 1 })
PRIMARY> let stream = db.updtest.watch()
通过设置列表字段进行更新会导致 return 整个数组的更改事件:
PRIMARY> db.updtest.updateOne({},{$set:{list:[
{item:"1",state:"running"},
{item:"2",state:"done"},
{item:"3",state:"running"}
]}});
{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }
PRIMARY> stream.next();
{
"_id" : {
"_data" : "82612577BE000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
},
"operationType" : "update",
"clusterTime" : Timestamp(1629845438, 1),
"ns" : {
"db" : "test",
"coll" : "updtest"
},
"documentKey" : {
"_id" : ObjectId("6125779d98787c286c544305")
},
"updateDescription" : {
"updatedFields" : {
"list" : [
{
"item" : "1",
"state" : "running"
},
{
"item" : "2",
"state" : "done"
},
{
"item" : "3",
"state" : "running"
}
]
},
"removedFields" : [ ]
}
}
仅更新一个子文档中的一个字段会导致仅包含已修改字段的更改事件:
PRIMARY> db.updtest.update({"list.item":"3"},{$set:{"list.$.state":"done"}});
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
PRIMARY> stream.next();
{
"_id" : {
"_data" : "8261257879000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
},
"operationType" : "update",
"clusterTime" : Timestamp(1629845625, 1),
"ns" : {
"db" : "test",
"coll" : "updtest"
},
"documentKey" : {
"_id" : ObjectId("6125779d98787c286c544305")
},
"updateDescription" : {
"updatedFields" : {
"list.2.state" : "done"
},
"removedFields" : [ ]
}
}
如果您还使用更改流选项 return 完整文档,您将拥有更改字段周围的上下文。