如何使用基于 JWT 令牌的授权来保护 fastapi API 端点?
How to secure fastapi API endpoint with JWT Token based authorization?
我对 python 中的 FastAPI 有点陌生。我正在构建一个 API 后端框架,需要具有基于 JWT 令牌的授权。现在,我知道如何生成 JWT 令牌,但不确定如何将其与 Python 中快速 api 中的 API 方法集成。任何指针将不胜感激。
使用 Depends and Response Model
可以轻松地将其集成到 API 方法中
所以让我举个例子,假设您正在部署您的 ML 模型,并且您将添加一些安全性,在您的情况下您已经创建了令牌部分:
TL DR
class User(BaseModel):
pass
...
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
...
async def get_current_user(token: str = Depends(oauth2_scheme)): # You created a function that depends on oauth2_scheme
pass
...
@app.get("/users/me/models/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
pass
一些例子
Pydantic 架构
class Url(BaseModel):
url: str
class AuthorizationResponse(BaseModel):
pass
class User(BaseModel):
pass
class AuthUser(BaseModel):
pass
class Token(BaseModel):
pass
您的应用程序
LOGIN_URL = "https://example.com/login/oauth/authorize"
REDIRECT_URL = f"{app}/auth/app"
...
@app.get("/login")
def get_login_url() -> Url:
return Url(url=f"{LOGIN_URL}?{urlencode(some_params_here)}")
@app.post("/authorize")
async def verify_authorization(body: AuthorizationResponse, db: Session = Depends(some_database_fetch)) -> Token:
return Token(access_token=access_token, token_type="bearer", user=User)
def create_access_token(*, data: User, expire_time: int = None) -> bytes:
return encoded_jwt
def get_user_from_header(*, authorization: str = Header(None)) -> User: # from fastapi import Header
return token_data #Token data = User(**payload)
@app.get("/me", response_model=User)
def read_profile(user: User = Depends(get_user_from_header), db: Session = Depends(some_database_fetch),) -> DbUser:
return db_user
上面例子的总结
- 我们创建了一个
LOGIN_URL
,然后为该 URL 创建了一个 Pydantic 架构
- 然后我们为后端创建了
/authorize
端点来检查它并从用户那里得到它需要的一切 API
- 用于创建简单 JWT 令牌的函数
create_access_token
- 通过我们刚刚创建的 JWT 令牌,我们可以创建一个依赖项
get_user_from_header
以在一些私有端点中使用
Sebastian Ramirez(FastAPI 的创建者) 有一个很棒的视频,展示了如何向您的应用添加基本授权 FastAPI - Basic HTTP Auth
FastAPI 有一个很好的文档,oauth2-jwt:
对于一些现实世界的例子,fastapi-users 有一个完美的 JWT 身份验证后端。
在朋友和同事的帮助下,我解决了这个问题,并想与社区分享这个解决方案。这是现在的样子:
Python代码----
import json
import os
import datetime
from fastapi import HTTPException, Header
from urllib.request import urlopen
from jose import jwt
from jose import exceptions as JoseExceptions
from utils import logger
AUTH0_DOMAIN = os.environ.get(
'AUTH0_DOMAIN', 'https://<domain>/<tenant-id>/')
AUTH0_ISSUER = os.environ.get(
'AUTO0_ISSUER', 'https://sts.windows.net/<tenant>/')
AUTH0_API_AUDIENCE = os.environ.get(
'AUTH0_API_AUDIENCE', '<audience url>')
AZURE_OPENID_CONFIG = os.environ.get(
'AZURE_OPENID_CONFIG', 'https://login.microsoftonline.com/common/.well-known/openid-configuration')
def get_token_auth_header(authorization):
parts = authorization.split()
if parts[0].lower() != "bearer":
raise HTTPException(
status_code=401,
detail='Authorization header must start with Bearer')
elif len(parts) == 1:
raise HTTPException(
status_code=401,
detail='Authorization token not found')
elif len(parts) > 2:
raise HTTPException(
status_code=401,
detail='Authorization header be Bearer token')
token = parts[1]
return token
def get_payload(unverified_header, token, jwks_properties):
try:
payload = jwt.decode(
token,
key=jwks_properties["jwks"],
algorithms=jwks_properties["algorithms"], # ["RS256"] typically
audience=AUTH0_API_AUDIENCE,
issuer=AUTH0_ISSUER
)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail='Authorization token expired')
except jwt.JWTClaimsError:
raise HTTPException(
status_code=401,
detail='Incorrect claims, check the audience and issuer.')
except Exception:
raise HTTPException(
status_code=401,
detail='Unable to parse authentication token')
return payload
class AzureJWKS:
def __init__(self, openid_config: str=AZURE_OPENID_CONFIG):
self.openid_url = openid_config
self._jwks = None
self._signing_algorithms = []
self._last_updated = datetime.datetime(2000, 1, 1, 12, 0, 0)
def _refresh_cache(self):
openid_reader = urlopen(self.openid_url)
azure_config = json.loads(openid_reader.read())
self._signing_algorithms = azure_config["id_token_signing_alg_values_supported"]
jwks_url = azure_config["jwks_uri"]
jwks_reader = urlopen(jwks_url)
self._jwks = json.loads(jwks_reader.read())
logger.info(f"Refreshed jwks config from {jwks_url}.")
logger.info("Supported token signing algorithms: {}".format(str(self._signing_algorithms)))
self._last_updated = datetime.datetime.now()
def get_jwks(self, cache_hours: int=24):
logger.info("jwks config is out of date (last updated at {})".format(str(self._last_updated)))
self._refresh_cache()
return {'jwks': self._jwks, 'algorithms': self._signing_algorithms}
jwks_config = AzureJWKS()
async def require_auth(token: str = Header(...)):
token = get_token_auth_header(token)
try:
unverified_header = jwt.get_unverified_header(token)
except JoseExceptions.JWTError:
raise HTTPException(
status_code=401,
detail='Unable to decode authorization token headers')
payload = get_payload(unverified_header, token, jwks_config.get_jwks())
if not payload:
raise HTTPException(
status_code=401,
detail='Invalid authorization token')
return payload
希望社区能从中受益!
我发现可以对已接受的答案进行某些改进:
- 如果您选择使用 HTTPBearer 安全模式,授权 header 内容的格式会自动验证,并且不需要像已接受的答案
get_token_auth_header
中那样的函数。此外,关于身份验证,生成的文档最终非常清晰和解释:
- 当您解码令牌时,您可以捕获所有属于 class
JOSEError
后代的异常并打印它们的消息,避免捕获特定异常并编写自定义消息
- 好处:在 jwt 解码方法中,您可以指定要忽略的声明,因为您不想验证它们
示例片段:
哪里...
/endpoints
- hello.py
- __init__.p
dependency.py
main.py
# dependency.py script
from jose import jwt
from jose.exceptions import JOSEError
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBasicCredentials, HTTPBearer
security = HTTPBearer()
async def has_access(credentials: HTTPBasicCredentials = Depends(security)):
"""
Function that is used to validate the token in the case that it requires it
"""
token = credentials.credentials
try:
payload = jwt.decode(token, key='secret', options={"verify_signature": False,
"verify_aud": False,
"verify_iss": False})
print("payload => ", payload)
except JOSEError as e: # catches any exception
raise HTTPException(
status_code=401,
detail=str(e))
# main.py script
from fastapi import FastAPI, Depends
from endpoints import hello
from dependency import has_access
app = FastAPI()
# routes
PROTECTED = [Depends(has_access)]
app.include_router(
hello.router,
prefix="/hello",
dependencies=PROTECTED
)
# hello.py script
from fastapi import APIRouter
router = APIRouter()
@router.get("")
async def say_hi(name: str):
return "Hi " + name
通过利用所有提到的功能,您最终构建了一个安全性超快的 API:)
我对 python 中的 FastAPI 有点陌生。我正在构建一个 API 后端框架,需要具有基于 JWT 令牌的授权。现在,我知道如何生成 JWT 令牌,但不确定如何将其与 Python 中快速 api 中的 API 方法集成。任何指针将不胜感激。
使用 Depends and Response Model
可以轻松地将其集成到 API 方法中所以让我举个例子,假设您正在部署您的 ML 模型,并且您将添加一些安全性,在您的情况下您已经创建了令牌部分:
TL DR
class User(BaseModel):
pass
...
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
...
async def get_current_user(token: str = Depends(oauth2_scheme)): # You created a function that depends on oauth2_scheme
pass
...
@app.get("/users/me/models/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
pass
一些例子
Pydantic 架构
class Url(BaseModel):
url: str
class AuthorizationResponse(BaseModel):
pass
class User(BaseModel):
pass
class AuthUser(BaseModel):
pass
class Token(BaseModel):
pass
您的应用程序
LOGIN_URL = "https://example.com/login/oauth/authorize"
REDIRECT_URL = f"{app}/auth/app"
...
@app.get("/login")
def get_login_url() -> Url:
return Url(url=f"{LOGIN_URL}?{urlencode(some_params_here)}")
@app.post("/authorize")
async def verify_authorization(body: AuthorizationResponse, db: Session = Depends(some_database_fetch)) -> Token:
return Token(access_token=access_token, token_type="bearer", user=User)
def create_access_token(*, data: User, expire_time: int = None) -> bytes:
return encoded_jwt
def get_user_from_header(*, authorization: str = Header(None)) -> User: # from fastapi import Header
return token_data #Token data = User(**payload)
@app.get("/me", response_model=User)
def read_profile(user: User = Depends(get_user_from_header), db: Session = Depends(some_database_fetch),) -> DbUser:
return db_user
上面例子的总结
- 我们创建了一个
LOGIN_URL
,然后为该 URL 创建了一个 Pydantic 架构
- 然后我们为后端创建了
/authorize
端点来检查它并从用户那里得到它需要的一切 API - 用于创建简单 JWT 令牌的函数
create_access_token
- 通过我们刚刚创建的 JWT 令牌,我们可以创建一个依赖项
get_user_from_header
以在一些私有端点中使用
Sebastian Ramirez(FastAPI 的创建者) 有一个很棒的视频,展示了如何向您的应用添加基本授权 FastAPI - Basic HTTP Auth
FastAPI 有一个很好的文档,oauth2-jwt:
对于一些现实世界的例子,fastapi-users 有一个完美的 JWT 身份验证后端。
在朋友和同事的帮助下,我解决了这个问题,并想与社区分享这个解决方案。这是现在的样子:
Python代码----
import json
import os
import datetime
from fastapi import HTTPException, Header
from urllib.request import urlopen
from jose import jwt
from jose import exceptions as JoseExceptions
from utils import logger
AUTH0_DOMAIN = os.environ.get(
'AUTH0_DOMAIN', 'https://<domain>/<tenant-id>/')
AUTH0_ISSUER = os.environ.get(
'AUTO0_ISSUER', 'https://sts.windows.net/<tenant>/')
AUTH0_API_AUDIENCE = os.environ.get(
'AUTH0_API_AUDIENCE', '<audience url>')
AZURE_OPENID_CONFIG = os.environ.get(
'AZURE_OPENID_CONFIG', 'https://login.microsoftonline.com/common/.well-known/openid-configuration')
def get_token_auth_header(authorization):
parts = authorization.split()
if parts[0].lower() != "bearer":
raise HTTPException(
status_code=401,
detail='Authorization header must start with Bearer')
elif len(parts) == 1:
raise HTTPException(
status_code=401,
detail='Authorization token not found')
elif len(parts) > 2:
raise HTTPException(
status_code=401,
detail='Authorization header be Bearer token')
token = parts[1]
return token
def get_payload(unverified_header, token, jwks_properties):
try:
payload = jwt.decode(
token,
key=jwks_properties["jwks"],
algorithms=jwks_properties["algorithms"], # ["RS256"] typically
audience=AUTH0_API_AUDIENCE,
issuer=AUTH0_ISSUER
)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail='Authorization token expired')
except jwt.JWTClaimsError:
raise HTTPException(
status_code=401,
detail='Incorrect claims, check the audience and issuer.')
except Exception:
raise HTTPException(
status_code=401,
detail='Unable to parse authentication token')
return payload
class AzureJWKS:
def __init__(self, openid_config: str=AZURE_OPENID_CONFIG):
self.openid_url = openid_config
self._jwks = None
self._signing_algorithms = []
self._last_updated = datetime.datetime(2000, 1, 1, 12, 0, 0)
def _refresh_cache(self):
openid_reader = urlopen(self.openid_url)
azure_config = json.loads(openid_reader.read())
self._signing_algorithms = azure_config["id_token_signing_alg_values_supported"]
jwks_url = azure_config["jwks_uri"]
jwks_reader = urlopen(jwks_url)
self._jwks = json.loads(jwks_reader.read())
logger.info(f"Refreshed jwks config from {jwks_url}.")
logger.info("Supported token signing algorithms: {}".format(str(self._signing_algorithms)))
self._last_updated = datetime.datetime.now()
def get_jwks(self, cache_hours: int=24):
logger.info("jwks config is out of date (last updated at {})".format(str(self._last_updated)))
self._refresh_cache()
return {'jwks': self._jwks, 'algorithms': self._signing_algorithms}
jwks_config = AzureJWKS()
async def require_auth(token: str = Header(...)):
token = get_token_auth_header(token)
try:
unverified_header = jwt.get_unverified_header(token)
except JoseExceptions.JWTError:
raise HTTPException(
status_code=401,
detail='Unable to decode authorization token headers')
payload = get_payload(unverified_header, token, jwks_config.get_jwks())
if not payload:
raise HTTPException(
status_code=401,
detail='Invalid authorization token')
return payload
希望社区能从中受益!
我发现可以对已接受的答案进行某些改进:
- 如果您选择使用 HTTPBearer 安全模式,授权 header 内容的格式会自动验证,并且不需要像已接受的答案
get_token_auth_header
中那样的函数。此外,关于身份验证,生成的文档最终非常清晰和解释:
- 当您解码令牌时,您可以捕获所有属于 class
JOSEError
后代的异常并打印它们的消息,避免捕获特定异常并编写自定义消息 - 好处:在 jwt 解码方法中,您可以指定要忽略的声明,因为您不想验证它们
示例片段: 哪里...
/endpoints
- hello.py
- __init__.p
dependency.py
main.py
# dependency.py script
from jose import jwt
from jose.exceptions import JOSEError
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBasicCredentials, HTTPBearer
security = HTTPBearer()
async def has_access(credentials: HTTPBasicCredentials = Depends(security)):
"""
Function that is used to validate the token in the case that it requires it
"""
token = credentials.credentials
try:
payload = jwt.decode(token, key='secret', options={"verify_signature": False,
"verify_aud": False,
"verify_iss": False})
print("payload => ", payload)
except JOSEError as e: # catches any exception
raise HTTPException(
status_code=401,
detail=str(e))
# main.py script
from fastapi import FastAPI, Depends
from endpoints import hello
from dependency import has_access
app = FastAPI()
# routes
PROTECTED = [Depends(has_access)]
app.include_router(
hello.router,
prefix="/hello",
dependencies=PROTECTED
)
# hello.py script
from fastapi import APIRouter
router = APIRouter()
@router.get("")
async def say_hi(name: str):
return "Hi " + name
通过利用所有提到的功能,您最终构建了一个安全性超快的 API:)