如何使用基于 JWT 令牌的授权来保护 fastapi API 端点?

How to secure fastapi API endpoint with JWT Token based authorization?

我对 python 中的 FastAPI 有点陌生。我正在构建一个 API 后端框架,需要具有基于 JWT 令牌的授权。现在,我知道如何生成 JWT 令牌,但不确定如何将其与 Python 中快速 api 中的 API 方法集成。任何指针将不胜感激。

使用 Depends and Response Model

可以轻松地将其集成到 API 方法中

所以让我举个例子,假设您正在部署您的 ML 模型,并且您将添加一些安全性,在您的情况下您已经创建了令牌部分:

TL DR

class User(BaseModel):
    pass
...
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
...
async def get_current_user(token: str = Depends(oauth2_scheme)): # You created a function that depends on oauth2_scheme
    pass
...
@app.get("/users/me/models/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
   pass

一些例子

Pydantic 架构

class Url(BaseModel):
    url: str

class AuthorizationResponse(BaseModel):
    pass

class User(BaseModel):
    pass

class AuthUser(BaseModel):
    pass

class Token(BaseModel):
    pass

您的应用程序

LOGIN_URL = "https://example.com/login/oauth/authorize"
REDIRECT_URL = f"{app}/auth/app"
...
@app.get("/login")
def get_login_url() -> Url:
    return Url(url=f"{LOGIN_URL}?{urlencode(some_params_here)}")

@app.post("/authorize")
async def verify_authorization(body: AuthorizationResponse, db: Session = Depends(some_database_fetch)) -> Token:
    return Token(access_token=access_token, token_type="bearer", user=User)

def create_access_token(*, data: User, expire_time: int = None) -> bytes:
    return encoded_jwt

def get_user_from_header(*, authorization: str = Header(None)) -> User: # from fastapi import Header
    return token_data   #Token data = User(**payload)

@app.get("/me", response_model=User)
def read_profile(user: User = Depends(get_user_from_header), db: Session = Depends(some_database_fetch),) -> DbUser:
    return db_user

上面例子的总结

  1. 我们创建了一个 LOGIN_URL,然后为该 URL
  2. 创建了一个 Pydantic 架构
  3. 然后我们为后端创建了 /authorize 端点来检查它并从用户那里得到它需要的一切 API
  4. 用于创建简单 JWT 令牌的函数 create_access_token
  5. 通过我们刚刚创建的 JWT 令牌,我们可以创建一个依赖项 get_user_from_header 以在一些私有端点中使用

Sebastian Ramirez(FastAPI 的创建者) 有一个很棒的视频,展示了如何向您的应用添加基本授权 FastAPI - Basic HTTP Auth

FastAPI 有一个很好的文档,oauth2-jwt:

对于一些现实世界的例子,fastapi-users 有一个完美的 JWT 身份验证后端。

在朋友和同事的帮助下,我解决了这个问题,并想与社区分享这个解决方案。这是现在的样子:

Python代码----

import json

import os

import datetime

from fastapi import HTTPException, Header

from urllib.request import urlopen

from jose import jwt

from jose import exceptions as JoseExceptions

from utils import logger

AUTH0_DOMAIN = os.environ.get(
    'AUTH0_DOMAIN', 'https://<domain>/<tenant-id>/')

AUTH0_ISSUER = os.environ.get(
    'AUTO0_ISSUER', 'https://sts.windows.net/<tenant>/')

AUTH0_API_AUDIENCE = os.environ.get(
    'AUTH0_API_AUDIENCE', '<audience url>')

AZURE_OPENID_CONFIG = os.environ.get(
    'AZURE_OPENID_CONFIG', 'https://login.microsoftonline.com/common/.well-known/openid-configuration')


def get_token_auth_header(authorization):
    parts = authorization.split()

    if parts[0].lower() != "bearer":
        raise HTTPException(
            status_code=401, 
            detail='Authorization header must start with Bearer')
    elif len(parts) == 1:
        raise HTTPException(
            status_code=401, 
            detail='Authorization token not found')
    elif len(parts) > 2:
        raise HTTPException(
            status_code=401, 
            detail='Authorization header be Bearer token')
    
    token = parts[1]
    return token


def get_payload(unverified_header, token, jwks_properties):
    try:
        payload = jwt.decode(
            token,
            key=jwks_properties["jwks"],
            algorithms=jwks_properties["algorithms"],  # ["RS256"] typically
            audience=AUTH0_API_AUDIENCE,
            issuer=AUTH0_ISSUER
        )
    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=401, 
            detail='Authorization token expired')
    except jwt.JWTClaimsError:
        raise HTTPException(
            status_code=401, 
            detail='Incorrect claims, check the audience and issuer.')
    except Exception:
        raise HTTPException(
            status_code=401, 
            detail='Unable to parse authentication token')

    return payload


class AzureJWKS:
    def __init__(self, openid_config: str=AZURE_OPENID_CONFIG):
        self.openid_url = openid_config
        self._jwks = None
        self._signing_algorithms = []
        self._last_updated = datetime.datetime(2000, 1, 1, 12, 0, 0)
    
    def _refresh_cache(self):
        openid_reader = urlopen(self.openid_url)
        azure_config = json.loads(openid_reader.read())
        self._signing_algorithms = azure_config["id_token_signing_alg_values_supported"]
        jwks_url = azure_config["jwks_uri"]

        jwks_reader = urlopen(jwks_url)
        self._jwks = json.loads(jwks_reader.read())

        logger.info(f"Refreshed jwks config from {jwks_url}.")
        logger.info("Supported token signing algorithms: {}".format(str(self._signing_algorithms)))
        self._last_updated = datetime.datetime.now()

    def get_jwks(self, cache_hours: int=24):
        
            logger.info("jwks config is out of date (last updated at {})".format(str(self._last_updated)))
            self._refresh_cache()
        return {'jwks': self._jwks, 'algorithms': self._signing_algorithms}

jwks_config = AzureJWKS()


async def require_auth(token: str = Header(...)):
    token = get_token_auth_header(token)
   

    try:
        unverified_header = jwt.get_unverified_header(token)
    except JoseExceptions.JWTError:
        raise HTTPException(
                    status_code=401, 
                    detail='Unable to decode authorization token headers')

    payload = get_payload(unverified_header, token, jwks_config.get_jwks())
    if not payload:
        raise HTTPException(
                    status_code=401, 
                    detail='Invalid authorization token')

    return payload

希望社区能从中受益!

我发现可以对已接受的答案进行某些改进:

  • 如果您选择使用 HTTPBearer 安全模式授权 header 内容的格式会自动验证,并且不需要像已接受的答案 get_token_auth_header 中那样的函数。此外,关于身份验证,生成的文档最终非常清晰和解释:

  • 当您解码令牌时,您可以捕获所有属于 class JOSEError 后代的异常并打印它们的消息,避免捕获特定异常并编写自定义消息
  • 好处:在 jwt 解码方法中,您可以指定要忽略的声明,因为您不想验证它们

示例片段: 哪里...

/endpoints
          - hello.py
          - __init__.p
dependency.py
main.py
# dependency.py script
from jose import jwt
from jose.exceptions import JOSEError
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBasicCredentials, HTTPBearer

security = HTTPBearer()

async def has_access(credentials: HTTPBasicCredentials = Depends(security)):
    """
        Function that is used to validate the token in the case that it requires it
    """
    token = credentials.credentials

    try:
        payload = jwt.decode(token, key='secret', options={"verify_signature": False,
                                                           "verify_aud": False,
                                                           "verify_iss": False})
        print("payload => ", payload)
    except JOSEError as e:  # catches any exception
        raise HTTPException(
            status_code=401,
            detail=str(e))
# main.py script
from fastapi import FastAPI, Depends
from endpoints import hello
from dependency import has_access

app = FastAPI()

# routes
PROTECTED = [Depends(has_access)]

app.include_router(
    hello.router,
    prefix="/hello",
    dependencies=PROTECTED
)
# hello.py script
from fastapi import APIRouter

router = APIRouter()

@router.get("")
async def say_hi(name: str):
    return "Hi " + name

通过利用所有提到的功能,您最终构建了一个安全性超快的 API:)