处理 fastapi 中的令牌过期
Handling the token expiration in fastapi
我是 fastapi 安全方面的新手,我正在尝试实施身份验证,然后使用范围。
问题是我正在为令牌设置过期时间,但过期后用户仍然通过身份验证并可以访问服务
import json
from jose import jwt,JWTError
from typing import Optional
from datetime import datetime,timedelta
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes
from fastapi import APIRouter, UploadFile, File, Depends, HTTPException,status
from tinydb import TinyDB,where
from tinydb import Query
from passlib.hash import bcrypt
from pydantic import BaseModel
from passlib.context import CryptContext
##
class TokenData(BaseModel):
username: Optional[str] = None
class Token(BaseModel):
access_token: str
token_type: str
router = APIRouter()
SECRET_KEY="e79b2a1eaa2b801bc81c49127ca4607749cc2629f73518194f528fc5c8491713"
ALGORITHM="HS256"
ACCESS_TOKEN_EXPIRE_MINUTES=1
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/dev-service/api/v1/openvpn/token")
db=TinyDB('app/Users.json')
Users = db.table('User')
User = Query
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class User(BaseModel):
username: str
password:str
def get_user(username: str):#still
user= Users.search((where('name') ==name))
if user:
return user[0]
@router.post('/verif')
async def verify_user(name,password):
user = Users.search((where('name') ==name))
print(user)
if not user:
return False
print(user)
passw=user[0]['password']
if not bcrypt.verify(password,passw):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=1)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@router.post("/token", response_model=Token)
async def token_generate(form_data:OAuth2PasswordRequestForm=Depends()):
user=await verify_user(form_data.username,form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": form_data.username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
@router.get('/user/me')
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user =Users.search(where('name') ==token_data.username)
if user is None:
raise credentials_exception
return user
@router.post('/user')
async def create_user(name,password):
Users.insert({'name':name,'password':bcrypt.hash(password)})
return True
如何才能真正看到令牌的到期时间以及如何添加范围?
当我开始使用 FastAPI 时,我也有过同样的困惑。您创建的访问令牌不会自行过期,因此您需要在 get_current_user
验证令牌时检查它是否已过期。您可以将 TokenData
架构修改为以下代码:
class TokenData(BaseModel):
username: Optional[str] = None
expires: Optional[datetime]
你的 get_current_user
到:
@router.get('/user/me')
async def get_current_user(token: str = Depends(oauth2_scheme)):
# get the current user from auth token
# define credential exception
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# decode token and extract username and expires data
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
expires = payload.get("exp")
except JWTError:
raise credentials_exception
# validate username
if username is None:
raise credentials_exception
token_data = TokenData(username=username, expires=expires)
user = Users.search(where('name') == token_data.username)
if user is None:
raise credentials_exception
# check token expiration
if expires is None:
raise credentials_exception
if datetime.utcnow() > token_data.expires:
raise credentials_exception
return user
Scopes 本身就是一个很大的话题,所以我不能在这里介绍它。但是,您可以在 fastAPI 文档 here
中阅读更多相关信息
上面的答案没有说明 token_data.expires 需要转换为 utc 日期时间对象。
# check token expiration
if expires is None:
raise credentials_exception
if datetime.utcnow() > datetime.utcfromtimestamp(token_data.expires):
raise credentials_exception
return user
我想对 Unyime Etim 的建议发表评论
但还没有评级,所以这将是一个单独的答案
我只是想补充一点 jwt.decode 有一个 built-in 方法来检查“exp”
它会默认检查它 (https://github.com/mpdavis/python-jose/blob/96474ecfb6ad3ce16f41b0814ab5126d58725e2a/jose/jwt.py#L82)
因此,为了确保您的令牌已过期,您可以处理相应的异常 ExpiredSignatureError
try:
# decode token and extract username and expires data
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
except ExpiredSignatureError: # <---- this one
raise HTTPException(status_code=403, detail="token has been expired")
except JWTError:
raise credentials_exception
我是 fastapi 安全方面的新手,我正在尝试实施身份验证,然后使用范围。
问题是我正在为令牌设置过期时间,但过期后用户仍然通过身份验证并可以访问服务
import json
from jose import jwt,JWTError
from typing import Optional
from datetime import datetime,timedelta
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes
from fastapi import APIRouter, UploadFile, File, Depends, HTTPException,status
from tinydb import TinyDB,where
from tinydb import Query
from passlib.hash import bcrypt
from pydantic import BaseModel
from passlib.context import CryptContext
##
class TokenData(BaseModel):
username: Optional[str] = None
class Token(BaseModel):
access_token: str
token_type: str
router = APIRouter()
SECRET_KEY="e79b2a1eaa2b801bc81c49127ca4607749cc2629f73518194f528fc5c8491713"
ALGORITHM="HS256"
ACCESS_TOKEN_EXPIRE_MINUTES=1
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/dev-service/api/v1/openvpn/token")
db=TinyDB('app/Users.json')
Users = db.table('User')
User = Query
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class User(BaseModel):
username: str
password:str
def get_user(username: str):#still
user= Users.search((where('name') ==name))
if user:
return user[0]
@router.post('/verif')
async def verify_user(name,password):
user = Users.search((where('name') ==name))
print(user)
if not user:
return False
print(user)
passw=user[0]['password']
if not bcrypt.verify(password,passw):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=1)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@router.post("/token", response_model=Token)
async def token_generate(form_data:OAuth2PasswordRequestForm=Depends()):
user=await verify_user(form_data.username,form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(data={"sub": form_data.username}, expires_delta=access_token_expires)
return {"access_token": access_token, "token_type": "bearer"}
@router.get('/user/me')
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user =Users.search(where('name') ==token_data.username)
if user is None:
raise credentials_exception
return user
@router.post('/user')
async def create_user(name,password):
Users.insert({'name':name,'password':bcrypt.hash(password)})
return True
如何才能真正看到令牌的到期时间以及如何添加范围?
当我开始使用 FastAPI 时,我也有过同样的困惑。您创建的访问令牌不会自行过期,因此您需要在 get_current_user
验证令牌时检查它是否已过期。您可以将 TokenData
架构修改为以下代码:
class TokenData(BaseModel):
username: Optional[str] = None
expires: Optional[datetime]
你的 get_current_user
到:
@router.get('/user/me')
async def get_current_user(token: str = Depends(oauth2_scheme)):
# get the current user from auth token
# define credential exception
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# decode token and extract username and expires data
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
expires = payload.get("exp")
except JWTError:
raise credentials_exception
# validate username
if username is None:
raise credentials_exception
token_data = TokenData(username=username, expires=expires)
user = Users.search(where('name') == token_data.username)
if user is None:
raise credentials_exception
# check token expiration
if expires is None:
raise credentials_exception
if datetime.utcnow() > token_data.expires:
raise credentials_exception
return user
Scopes 本身就是一个很大的话题,所以我不能在这里介绍它。但是,您可以在 fastAPI 文档 here
中阅读更多相关信息上面的答案没有说明 token_data.expires 需要转换为 utc 日期时间对象。
# check token expiration
if expires is None:
raise credentials_exception
if datetime.utcnow() > datetime.utcfromtimestamp(token_data.expires):
raise credentials_exception
return user
我想对 Unyime Etim 的建议发表评论 但还没有评级,所以这将是一个单独的答案
我只是想补充一点 jwt.decode 有一个 built-in 方法来检查“exp” 它会默认检查它 (https://github.com/mpdavis/python-jose/blob/96474ecfb6ad3ce16f41b0814ab5126d58725e2a/jose/jwt.py#L82)
因此,为了确保您的令牌已过期,您可以处理相应的异常 ExpiredSignatureError
try:
# decode token and extract username and expires data
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
except ExpiredSignatureError: # <---- this one
raise HTTPException(status_code=403, detail="token has been expired")
except JWTError:
raise credentials_exception