MongoDB 通过 ID 查询和更新嵌套数组中的单个对象?

MongoDB query and update single object inside nested array by ID?

来自这个示例文档:

...
{
    "_id": ObjectId("5a934e000102030405000000"),
    "guild_id": 1234,
    "members": [
        ...
        {
            "member_id": 002,
            "name": "John Doe"
        },
        ...
    ]
}
...

我想从嵌套数组中获取单个成员对象,以便返回我:

{"member_id": 002, "name": "John Doe"}

我可以用它来读取会员信息。

"member_id" 的一个查询如何与 002 一样并获得上述所需的结果?有了这个,您还如何更新成员 "name" 呢?请最好使用 Python 语法,但任何语法都可以。

汇总

db.collection.aggregate([
  {
    "$match": {
      "members.member_id": "001"
    }
  },
  {
    "$unwind": "$members"
  },
  {
    "$match": {
      "members.member_id": "001"
    }
  },
  {
    "$replaceWith": "$members"
  }
])

mongoplayground


更新

db.collection.update({
  "members.member_id": "001"
},
{
  "$set": {
    "members.$.name": "John"
  }
},
{
  "multi": false,
  "upsert": false
})

mongoplayground


python(fastapi) 自行编辑

from fastapi import FastAPI, Request, status, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pymongo import MongoClient
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel

CONNECTION_STRING = "mongodb://localhost"
try:
    client = MongoClient(CONNECTION_STRING, serverSelectionTimeoutMS=3000)
    print('Connected ! Mongo version is', client.server_info()['version'])
except:
    print('Disconnected !')

app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
        content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
    )

templates = Jinja2Templates(directory="static")

@app.get("/")
async def root():
    db = get_database()
    c = db['key'].find({"_id":2})
    if (c):
        return {"message": c[0]}
    return {"message": c}

@app.get("/get/{id}")
async def root(id: int):
    db = get_database()
    c = list(db['key'].find({"_id":id}))
    return {"message": c}

def get_database():
    return client['test']
    
# This is added so that many files can reuse the function get_database()
if __name__ == "__main__":    
    print('__main__ !')
    # Get the database
    dbname = get_database()