如何监听 mongodb 变化流中嵌套数组特定字段的变化?

How to listen to change in specific field of a nested array in mongodb change streams?

这是我在 mongodb.

中的 BSON 文档的结构
{
    "tournament_id": "P1oi12mwj10b1b",
    "matches": [
        {
            "date_order": 1,
            "matches": [
                {
                    "match_id": "1A4i0sp34"
                    "time_order": 1,
                    "win": "team1",
                    "team1": "bar",
                    "team2": "psg"
                },
                {
                    "match_id": "3A4j0sp26"
                    "time_order": 2,
                    "win": "",
                    "team1": "rma",
                    "team2": "sev"
                }
            ]
        },
        {
            "date_order": 2,
            "matches": [
                {
                    "match_id": "2B4k0sp29"
                    "time_order": 1,
                    "win": "",
                    "team1": "manU",
                    "team2": "manC"
                },
                {
                    "match_id": "4A4i0sp31"
                    "time_order": 2,
                    "win": "",
                    "team1": "chelsea",
                    "team2": "arsenal"
                }
            ]
        }
    ]
}

我想制作一个通知系统,在比赛结束时发送通知。换句话说,每当 win 字段的值发生变化时,我想捕获更新的匹配项。我正在使用 mongodb 更改流。

例如,如果匹配 match_id 3A4j0sp26 刚刚完成,我想打印那个对象。

{
      "match_id": "3A4j0sp26"
      "time_order": 2,
      "win": "team2",
      "team1": "rma",
      "team2": "sev"
      # If possible I also want to find these fields,
      "tournament_id": "P1oi12mwj10b1b",
      "date_order": 1
}

我试过这样做。

import pymongo
from bson.json_util import dumps

MONGO_URI = 'mongodb://localhost/mydb'
client = pymongo.MongoClient(MONGO_URI)

filters = []  # How to correctly set this filter ???
'''
What I already tried but failed
filters = [{
        '$match': {
            '$and': [
                {'updateDescription.updatedFields.matches': {'$exists': 'true'}},  # This line needs fixing.
                {'operationType': {'$in': ['replace', 'update']}}
            ]
        }
    }]
'''


change_stream = client.mydb.match.watch(filters)
for change in change_streams:
    print(dumps(change))

我尝试在不应用过滤器的情况下进行调试。我将 match_id 3A4j0sp26win 字段更新为 team2。 我得到了这个结果。

{
  "_id": {
    "_data": "8261252C2F000000012B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
  },
  "operationType": "replace",
  "clusterTime": {
    "$timestamp": {
      "t": 1629826095,
      "i": 1
    }
  },
  "fullDocument": {
    "_id": {
      "$oid": "61228ae88cf6743d054b8cef"
    },
    "tournament_id": "P1oi12mwj10b1b",
    "matches": [
        {
            "date_order": 1,
            "matches": [
                {
                    "match_id": "1A4i0sp34"
                    "time_order": 1,
                    "win": "team1",    # This was updated earlier. I don't want this.
                    "team1": "bar",
                    "team2": "psg"
                },
                {
                    "match_id": "3A4j0sp26"
                    "time_order": 2,
                    "win": "team1",     # This is the most recently updated.
                    "team1": "rma",
                    "team2": "sev"
                }
            ]
        }]
   }
}

它显示数组中的所有元素,而不是刚刚更新的元素。

已编辑

仅更新​​“分数”字段后得到的结果。

{
  "_id": {
    "_data": "8261254598000000022B022C0100296E5A1004D4D1F2A9AF33491089DE8C2A51537EBB46645F6964006461228AE88CF6743D054B8CEF0004"
  },
  "operationType": "update",
  "clusterTime": {
    "$timestamp": {
      "t": 1629832600,
      "i": 2
    }
  },
  "ns": {
    "db": "mydb",
    "coll": "match"
  },
  "documentKey": {
    "_id": {
      "$oid": "61228ae88cf6743d054b8cef"
    }
  },
  "updateDescription": {
    "updatedFields": {
       "matches": [
        {
            "date_order": 1,
            "matches": [
                {
                    "match_id": "1A4i0sp34"
                    "time_order": 1,
                    "win": "team1",
                    "team1": "bar",
                    "team2": "psg"
                },
                {
                    "match_id": "3A4j0sp26"
                    "time_order": 2,
                    "win": "team1",
                    "team1": "rma",
                    "team2": "sev"
                }
            ]
        }]
   },
     "removedFields": []
    }
  }
}

这取决于您如何进行更新。

一个简短的测试来证明:

插入文档并开始更改流

PRIMARY> db.updtest.insert({list:[
                         {item:"1",state:"running"},
                         {item:"2",state:"done"},
                         {item:"3",state:"unknown"}
                  ]});

WriteResult({ "nInserted" : 1 })

PRIMARY> let stream = db.updtest.watch()

通过设置列表字段进行更新会导致 return 整个数组的更改事件:

PRIMARY> db.updtest.updateOne({},{$set:{list:[
                         {item:"1",state:"running"},
                         {item:"2",state:"done"},
                         {item:"3",state:"running"}
                  ]}});

{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 }

PRIMARY> stream.next();

{
    "_id" : {
        "_data" : "82612577BE000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
    },
    "operationType" : "update",
    "clusterTime" : Timestamp(1629845438, 1),
    "ns" : {
        "db" : "test",
        "coll" : "updtest"
    },
    "documentKey" : {
        "_id" : ObjectId("6125779d98787c286c544305")
    },
    "updateDescription" : {
        "updatedFields" : {
            "list" : [
                {
                    "item" : "1",
                    "state" : "running"
                },
                {
                    "item" : "2",
                    "state" : "done"
                },
                {
                    "item" : "3",
                    "state" : "running"
                }
            ]
        },
        "removedFields" : [ ]
    }
}

仅更新​​一个子文档中的一个字段会导致仅包含已修改字段的更改事件:

PRIMARY> db.updtest.update({"list.item":"3"},{$set:{"list.$.state":"done"}});

WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

PRIMARY> stream.next();

{
    "_id" : {
        "_data" : "8261257879000000012B022C0100296E5A100436BFE3F91AF84C7CB04826F361BCE50346645F696400646125779D98787C286C5443050004"
    },
    "operationType" : "update",
    "clusterTime" : Timestamp(1629845625, 1),
    "ns" : {
        "db" : "test",
        "coll" : "updtest"
    },
    "documentKey" : {
        "_id" : ObjectId("6125779d98787c286c544305")
    },
    "updateDescription" : {
        "updatedFields" : {
            "list.2.state" : "done"
        },
        "removedFields" : [ ]
    }
}

如果您还使用更改流选项 return 完整文档,您将拥有更改字段周围的上下文。