FastAPI - 在 API 调用时调用函数
FastAPI - Invoke Function on API Call
我目前构建了一个 API,它有一个需要 ID 和时间戳的用户模式:
def ts():
d = datetime.datetime.now()
return datetime.datetime.timestamp(d)*1000
def generate_id():
first = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(8))
second = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(4))
third = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(4))
fourth = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(4))
fifth = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(12))
_id = f'{first}-{second}-{third}-{fourth}-{fifth}'
return _id
class User(BaseModel):
id: str = generate_id()
username: str
created_timestamp: Optional[str] = ts()
这是我创建用户的函数:
def create_user(db: Session, user: schemas.User):
db_item = models.User(**user.dict())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
# Endpoint to create a new attribute
@app.post("/entity/", response_model=schemas.User)
def create_user(user: schemas.User, db: Session = Depends(get_db)):
db_user = crud.get_id(db, id=user.id)
if db_user is not None:
raise HTTPException(status_code=400, detail="ID schon vergeben")
mail_user = crud.get_email(db, email=user.email)
if mail_user is not None:
raise HTTPException(status_code=400, detail="Mail bereits vergeben")
return crud.create_user(db=db, user=user)
端点本身可以工作,但是当我发出 POST
请求时,第二次出现错误,因为 ID 没有“刷新”。时间戳也保持不变并且已过时。该函数似乎在服务器启动时被调用。有没有办法在每次 API 调用时触发该函数?
您必须使用 the default_factory
functionality of pydantic。
id: str = Field(default_factory=generate_id)
这将在您创建对象并将 return 值分配给 id
属性时调用函数 generate_id
。
我目前构建了一个 API,它有一个需要 ID 和时间戳的用户模式:
def ts():
d = datetime.datetime.now()
return datetime.datetime.timestamp(d)*1000
def generate_id():
first = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(8))
second = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(4))
third = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(4))
fourth = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(4))
fifth = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(12))
_id = f'{first}-{second}-{third}-{fourth}-{fifth}'
return _id
class User(BaseModel):
id: str = generate_id()
username: str
created_timestamp: Optional[str] = ts()
这是我创建用户的函数:
def create_user(db: Session, user: schemas.User):
db_item = models.User(**user.dict())
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
# Endpoint to create a new attribute
@app.post("/entity/", response_model=schemas.User)
def create_user(user: schemas.User, db: Session = Depends(get_db)):
db_user = crud.get_id(db, id=user.id)
if db_user is not None:
raise HTTPException(status_code=400, detail="ID schon vergeben")
mail_user = crud.get_email(db, email=user.email)
if mail_user is not None:
raise HTTPException(status_code=400, detail="Mail bereits vergeben")
return crud.create_user(db=db, user=user)
端点本身可以工作,但是当我发出 POST
请求时,第二次出现错误,因为 ID 没有“刷新”。时间戳也保持不变并且已过时。该函数似乎在服务器启动时被调用。有没有办法在每次 API 调用时触发该函数?
您必须使用 the default_factory
functionality of pydantic。
id: str = Field(default_factory=generate_id)
这将在您创建对象并将 return 值分配给 id
属性时调用函数 generate_id
。