MongoDB 通过 ID 查询和更新嵌套数组中的单个对象?
MongoDB query and update single object inside nested array by ID?
来自这个示例文档:
...
{
"_id": ObjectId("5a934e000102030405000000"),
"guild_id": 1234,
"members": [
...
{
"member_id": 002,
"name": "John Doe"
},
...
]
}
...
我想从嵌套数组中获取单个成员对象,以便返回我:
{"member_id": 002, "name": "John Doe"}
我可以用它来读取会员信息。
"member_id"
的一个查询如何与 002
一样并获得上述所需的结果?有了这个,您还如何更新成员 "name"
呢?请最好使用 Python 语法,但任何语法都可以。
汇总
db.collection.aggregate([
{
"$match": {
"members.member_id": "001"
}
},
{
"$unwind": "$members"
},
{
"$match": {
"members.member_id": "001"
}
},
{
"$replaceWith": "$members"
}
])
更新
db.collection.update({
"members.member_id": "001"
},
{
"$set": {
"members.$.name": "John"
}
},
{
"multi": false,
"upsert": false
})
python(fastapi) 自行编辑
from fastapi import FastAPI, Request, status, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pymongo import MongoClient
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel
CONNECTION_STRING = "mongodb://localhost"
try:
client = MongoClient(CONNECTION_STRING, serverSelectionTimeoutMS=3000)
print('Connected ! Mongo version is', client.server_info()['version'])
except:
print('Disconnected !')
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
)
templates = Jinja2Templates(directory="static")
@app.get("/")
async def root():
db = get_database()
c = db['key'].find({"_id":2})
if (c):
return {"message": c[0]}
return {"message": c}
@app.get("/get/{id}")
async def root(id: int):
db = get_database()
c = list(db['key'].find({"_id":id}))
return {"message": c}
def get_database():
return client['test']
# This is added so that many files can reuse the function get_database()
if __name__ == "__main__":
print('__main__ !')
# Get the database
dbname = get_database()
来自这个示例文档:
...
{
"_id": ObjectId("5a934e000102030405000000"),
"guild_id": 1234,
"members": [
...
{
"member_id": 002,
"name": "John Doe"
},
...
]
}
...
我想从嵌套数组中获取单个成员对象,以便返回我:
{"member_id": 002, "name": "John Doe"}
我可以用它来读取会员信息。
"member_id"
的一个查询如何与 002
一样并获得上述所需的结果?有了这个,您还如何更新成员 "name"
呢?请最好使用 Python 语法,但任何语法都可以。
汇总
db.collection.aggregate([
{
"$match": {
"members.member_id": "001"
}
},
{
"$unwind": "$members"
},
{
"$match": {
"members.member_id": "001"
}
},
{
"$replaceWith": "$members"
}
])
更新
db.collection.update({
"members.member_id": "001"
},
{
"$set": {
"members.$.name": "John"
}
},
{
"multi": false,
"upsert": false
})
python(fastapi) 自行编辑
from fastapi import FastAPI, Request, status, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pymongo import MongoClient
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel
CONNECTION_STRING = "mongodb://localhost"
try:
client = MongoClient(CONNECTION_STRING, serverSelectionTimeoutMS=3000)
print('Connected ! Mongo version is', client.server_info()['version'])
except:
print('Disconnected !')
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({"detail": exc.errors(), "body": exc.body}),
)
templates = Jinja2Templates(directory="static")
@app.get("/")
async def root():
db = get_database()
c = db['key'].find({"_id":2})
if (c):
return {"message": c[0]}
return {"message": c}
@app.get("/get/{id}")
async def root(id: int):
db = get_database()
c = list(db['key'].find({"_id":id}))
return {"message": c}
def get_database():
return client['test']
# This is added so that many files can reuse the function get_database()
if __name__ == "__main__":
print('__main__ !')
# Get the database
dbname = get_database()