authorize_access_token 是否也验证 id 令牌?

Does authorize_access_token also verify an id token?

为了检索 OAuth / OpenID Connect 令牌,在 authlib docs 中使用了函数 authorize_access_token。 Google 等 OAuth 提供商强烈建议手动验证这些令牌,例如通过检查到期日期。

authorize_access_token 的文档在哪里?我在网站上找不到任何东西。该函数是自动验证令牌还是我必须自己验证?

Authlib tracks the token expiration date when it acquires the key in authorize_access_token,但它不会立即测试密钥是否过期。 (它不希望提取过期的密钥!)

您是正确的,authlib 的文档对函数 authorize_access_token() 的作用很简单。在文档的 Web OAuth Client 选择中有一些指示,但该部分仅提及 authorize_access_token() 的 return 值。

# this only returns a token
token = oauth.providername.authorize_access_token()

为了充分理解 authorize_access_token() 的功能,我们需要探索 authlib 的 GitHub存储库。

这是来自 django 客户端代码的一段代码,它发出 OAuth 2 请求。

def authorize_access_token(self, request, **kwargs):
        """Fetch access token in one step.
        :param request: HTTP request instance from Django view.
        :return: A token dict.
        """
        if request.method == 'GET':
            error = request.GET.get('error')
            if error:
                description = request.GET.get('error_description')
                raise OAuthError(error=error, description=description)
            params = {
                'code': request.GET.get('code'),
                'state': request.GET.get('state'),
            }
        else:
            params = {
                'code': request.POST.get('code'),
                'state': request.POST.get('state'),
            }

        state_data = self.framework.get_state_data(request.session, params.get('state'))
        params = self._format_state_params(state_data, params)
        token = self.fetch_access_token(**params, **kwargs)

        if 'id_token' in token and 'nonce' in state_data:
            userinfo = self.parse_id_token(token, nonce=state_data['nonce'])
            token['userinfo'] = userinfo
        return token

在上面的 authorize_access_token() 代码中我们有这个调用:

error = request.GET.get('error')

访问base_client代码段的调用。其中有这些验证:

  • 不匹配状态错误
  • MissingRequestTokenError
  • MissingTokenError

base_client代码中的一个子调用是这样的:

 def _on_update_token(self, token, refresh_token=None, access_token=None):
 raise NotImplementedError()

如果出现问题,authorize_access_token() 将抛出错误:

if error:
   description = request.GET.get('error_description')
   raise OAuthError(error=error, description=description)

authorize_access_token() 函数中的每个调用也使用 base_client[= 进行各种类型的检查85=] 代码,内容广泛。

state_data = self.framework.get_state_data(request.session, params.get('state'))
params = self._format_state_params(state_data, params)
token = self.fetch_access_token(**params, **kwargs)

第一次调用state_data调用这个函数:

def get_state_data(self, session, state):
        key = f'_state_{self.name}_{state}'
        if self.cache:
            value = self._get_cache_data(key)
        else:
            value = session.get(key)
        if value:
            return value.get('data')
        return None

第二次调用params调用这个函数:

 def _format_state_params(state_data, params):
        if state_data is None:
            raise MismatchingStateError()

        code_verifier = state_data.get('code_verifier')
        if code_verifier:
            params['code_verifier'] = code_verifier

        redirect_uri = state_data.get('redirect_uri')
        if redirect_uri:
            params['redirect_uri'] = redirect_uri
        return params

第三次调用token调用这个函数:

def fetch_access_token(self, request_token=None, **kwargs):
        """Fetch access token in one step.
        :param request_token: A previous request token for OAuth 1.
        :param kwargs: Extra parameters to fetch access token.
        :return: A token dict.
        """
        with self._get_oauth_client() as client:
            if request_token is None:
                raise MissingRequestTokenError()
            # merge request token with verifier
            token = {}
            token.update(request_token)
            token.update(kwargs)
            client.token = token
            params = self.access_token_params or {}
            token = client.fetch_access_token(self.access_token_url, **params)
        return token

authorize_access_token()函数中的最后调用是 parse_id_token,在base_client代码中调用此函数:

async def parse_id_token(self, token, nonce, claims_options=None):
        """Return an instance of UserInfo from token's ``id_token``."""
        claims_params = dict(
            nonce=nonce,
            client_id=self.client_id,
        )
        if 'access_token' in token:
            claims_params['access_token'] = token['access_token']
            claims_cls = CodeIDToken
        else:
            claims_cls = ImplicitIDToken

        metadata = await self.load_server_metadata()
        if claims_options is None and 'issuer' in metadata:
            claims_options = {'iss': {'values': [metadata['issuer']]}}

        alg_values = metadata.get('id_token_signing_alg_values_supported')
        if not alg_values:
            alg_values = ['RS256']

        jwt = JsonWebToken(alg_values)

        jwk_set = await self.fetch_jwk_set()
        try:
            claims = jwt.decode(
                token['id_token'],
                key=JsonWebKey.import_key_set(jwk_set),
                claims_cls=claims_cls,
                claims_options=claims_options,
                claims_params=claims_params,
            )
        except ValueError:
            jwk_set = await self.fetch_jwk_set(force=True)
            claims = jwt.decode(
                token['id_token'],
                key=JsonWebKey.import_key_set(jwk_set),
                claims_cls=claims_cls,
                claims_options=claims_options,
                claims_params=claims_params,
            )

        # https://github.com/lepture/authlib/issues/259
        if claims.get('nonce_supported') is False:
            claims.params['nonce'] = None
        claims.validate(leeway=120)
        return UserInfo(claims)

在上述所有函数中,在 base_client 代码中调用了其他函数。例如,这里是从 authorize_access_token() 函数访问的代码的另一部分:

from requests import Session
from requests.auth import AuthBase
from authlib.oauth2.client import OAuth2Client
from authlib.oauth2.auth import ClientAuth, TokenAuth
from ..base_client import (
    OAuthError,
    InvalidTokenError,
    MissingTokenError,
    UnsupportedTokenTypeError,
)
from .utils import update_session_configure

__all__ = ['OAuth2Session', 'OAuth2Auth']


class OAuth2Auth(AuthBase, TokenAuth):
    """Sign requests for OAuth 2.0, currently only bearer token is supported."""

    def ensure_active_token(self):
        if self.client and not self.client.ensure_active_token(self.token):
            raise InvalidTokenError()

    def __call__(self, req):
        self.ensure_active_token()
        try:
            req.url, req.headers, req.body = self.prepare(
                req.url, req.headers, req.body)
        except KeyError as error:
            description = 'Unsupported token_type: {}'.format(str(error))
            raise UnsupportedTokenTypeError(description=description)
        return req


class OAuth2ClientAuth(AuthBase, ClientAuth):
    """Attaches OAuth Client Authentication to the given Request object.
    """
    def __call__(self, req):
        req.url, req.headers, req.body = self.prepare(
            req.method, req.url, req.headers, req.body
        )
        return req


class OAuth2Session(OAuth2Client, Session):
    """Construct a new OAuth 2 client requests session.
    :param client_id: Client ID, which you get from client registration.
    :param client_secret: Client Secret, which you get from registration.
    :param authorization_endpoint: URL of the authorization server's
        authorization endpoint.
    :param token_endpoint: URL of the authorization server's token endpoint.
    :param token_endpoint_auth_method: client authentication method for
        token endpoint.
    :param revocation_endpoint: URL of the authorization server's OAuth 2.0
        revocation endpoint.
    :param revocation_endpoint_auth_method: client authentication method for
        revocation endpoint.
    :param scope: Scope that you needed to access user resources.
    :param redirect_uri: Redirect URI you registered as callback.
    :param token: A dict of token attributes such as ``access_token``,
        ``token_type`` and ``expires_at``.
    :param token_placement: The place to put token in HTTP request. Available
        values: "header", "body", "uri".
    :param update_token: A function for you to update token. It accept a
        :class:`OAuth2Token` as parameter.
    """
    client_auth_class = OAuth2ClientAuth
    token_auth_class = OAuth2Auth
    SESSION_REQUEST_PARAMS = (
        'allow_redirects', 'timeout', 'cookies', 'files',
        'proxies', 'hooks', 'stream', 'verify', 'cert', 'json'
    )

    def __init__(self, client_id=None, client_secret=None,
                 token_endpoint_auth_method=None,
                 revocation_endpoint_auth_method=None,
                 scope=None, redirect_uri=None,
                 token=None, token_placement='header',
                 update_token=None, **kwargs):

        Session.__init__(self)
        update_session_configure(self, kwargs)

        OAuth2Client.__init__(
            self, session=self,
            client_id=client_id, client_secret=client_secret,
            token_endpoint_auth_method=token_endpoint_auth_method,
            revocation_endpoint_auth_method=revocation_endpoint_auth_method,
            scope=scope, redirect_uri=redirect_uri,
            token=token, token_placement=token_placement,
            update_token=update_token, **kwargs
        )

    def fetch_access_token(self, url=None, **kwargs):
        """Alias for fetch_token."""
        return self.fetch_token(url, **kwargs)

    def request(self, method, url, withhold_token=False, auth=None, **kwargs):
        """Send request with auto refresh token feature (if available)."""
        if not withhold_token and auth is None:
            if not self.token:
                raise MissingTokenError()
            auth = self.token_auth
        return super(OAuth2Session, self).request(
            method, url, auth=auth, **kwargs)

    @staticmethod
    def handle_error(error_type, error_description):
        raise OAuthError(error_type, error_description)

以下是与验证相关的声明:

REGISTERED_CLAIMS = ['iss', 'sub', 'aud', 'exp', 'nbf', 'iat', 'jti']

REGISTERED_CLAIMS = [
        'redirect_uris',
        'token_endpoint_auth_method',
        'grant_types',
        'response_types',
        'client_name',
        'client_uri',
        'logo_uri',
        'scope',
        'contacts',
        'tos_uri',
        'policy_uri',
        'jwks_uri',
        'jwks',
        'software_id',
        'software_version',
    ]

在 OAuth/OpenID 过程的每个阶段 authlib 正在执行多项检查以验证令牌。

希望我的回答能帮助您更好地理解函数authorize_access_token()

编码愉快!!