authorize_access_token 是否也验证 id 令牌?
Does authorize_access_token also verify an id token?
为了检索 OAuth / OpenID Connect 令牌,在 authlib docs 中使用了函数 authorize_access_token
。 Google 等 OAuth 提供商强烈建议手动验证这些令牌,例如通过检查到期日期。
authorize_access_token
的文档在哪里?我在网站上找不到任何东西。该函数是自动验证令牌还是我必须自己验证?
Authlib tracks the token expiration date when it acquires the key in authorize_access_token
,但它不会立即测试密钥是否过期。 (它不希望提取过期的密钥!)
您是正确的,authlib 的文档对函数 authorize_access_token() 的作用很简单。在文档的 Web OAuth Client 选择中有一些指示,但该部分仅提及 authorize_access_token() 的 return 值。
# this only returns a token
token = oauth.providername.authorize_access_token()
为了充分理解 authorize_access_token() 的功能,我们需要探索 authlib 的 GitHub存储库。
这是来自 django 客户端代码的一段代码,它发出 OAuth 2 请求。
def authorize_access_token(self, request, **kwargs):
"""Fetch access token in one step.
:param request: HTTP request instance from Django view.
:return: A token dict.
"""
if request.method == 'GET':
error = request.GET.get('error')
if error:
description = request.GET.get('error_description')
raise OAuthError(error=error, description=description)
params = {
'code': request.GET.get('code'),
'state': request.GET.get('state'),
}
else:
params = {
'code': request.POST.get('code'),
'state': request.POST.get('state'),
}
state_data = self.framework.get_state_data(request.session, params.get('state'))
params = self._format_state_params(state_data, params)
token = self.fetch_access_token(**params, **kwargs)
if 'id_token' in token and 'nonce' in state_data:
userinfo = self.parse_id_token(token, nonce=state_data['nonce'])
token['userinfo'] = userinfo
return token
在上面的 authorize_access_token() 代码中我们有这个调用:
error = request.GET.get('error')
访问base_client代码段的调用。其中有这些验证:
- 不匹配状态错误
- MissingRequestTokenError
- MissingTokenError
base_client代码中的一个子调用是这样的:
def _on_update_token(self, token, refresh_token=None, access_token=None):
raise NotImplementedError()
如果出现问题,authorize_access_token() 将抛出错误:
if error:
description = request.GET.get('error_description')
raise OAuthError(error=error, description=description)
authorize_access_token() 函数中的每个调用也使用 base_client[= 进行各种类型的检查85=] 代码,内容广泛。
state_data = self.framework.get_state_data(request.session, params.get('state'))
params = self._format_state_params(state_data, params)
token = self.fetch_access_token(**params, **kwargs)
第一次调用state_data调用这个函数:
def get_state_data(self, session, state):
key = f'_state_{self.name}_{state}'
if self.cache:
value = self._get_cache_data(key)
else:
value = session.get(key)
if value:
return value.get('data')
return None
第二次调用params调用这个函数:
def _format_state_params(state_data, params):
if state_data is None:
raise MismatchingStateError()
code_verifier = state_data.get('code_verifier')
if code_verifier:
params['code_verifier'] = code_verifier
redirect_uri = state_data.get('redirect_uri')
if redirect_uri:
params['redirect_uri'] = redirect_uri
return params
第三次调用token调用这个函数:
def fetch_access_token(self, request_token=None, **kwargs):
"""Fetch access token in one step.
:param request_token: A previous request token for OAuth 1.
:param kwargs: Extra parameters to fetch access token.
:return: A token dict.
"""
with self._get_oauth_client() as client:
if request_token is None:
raise MissingRequestTokenError()
# merge request token with verifier
token = {}
token.update(request_token)
token.update(kwargs)
client.token = token
params = self.access_token_params or {}
token = client.fetch_access_token(self.access_token_url, **params)
return token
authorize_access_token()函数中的最后调用是
parse_id_token,在base_client代码中调用此函数:
async def parse_id_token(self, token, nonce, claims_options=None):
"""Return an instance of UserInfo from token's ``id_token``."""
claims_params = dict(
nonce=nonce,
client_id=self.client_id,
)
if 'access_token' in token:
claims_params['access_token'] = token['access_token']
claims_cls = CodeIDToken
else:
claims_cls = ImplicitIDToken
metadata = await self.load_server_metadata()
if claims_options is None and 'issuer' in metadata:
claims_options = {'iss': {'values': [metadata['issuer']]}}
alg_values = metadata.get('id_token_signing_alg_values_supported')
if not alg_values:
alg_values = ['RS256']
jwt = JsonWebToken(alg_values)
jwk_set = await self.fetch_jwk_set()
try:
claims = jwt.decode(
token['id_token'],
key=JsonWebKey.import_key_set(jwk_set),
claims_cls=claims_cls,
claims_options=claims_options,
claims_params=claims_params,
)
except ValueError:
jwk_set = await self.fetch_jwk_set(force=True)
claims = jwt.decode(
token['id_token'],
key=JsonWebKey.import_key_set(jwk_set),
claims_cls=claims_cls,
claims_options=claims_options,
claims_params=claims_params,
)
# https://github.com/lepture/authlib/issues/259
if claims.get('nonce_supported') is False:
claims.params['nonce'] = None
claims.validate(leeway=120)
return UserInfo(claims)
在上述所有函数中,在 base_client 代码中调用了其他函数。例如,这里是从 authorize_access_token() 函数访问的代码的另一部分:
from requests import Session
from requests.auth import AuthBase
from authlib.oauth2.client import OAuth2Client
from authlib.oauth2.auth import ClientAuth, TokenAuth
from ..base_client import (
OAuthError,
InvalidTokenError,
MissingTokenError,
UnsupportedTokenTypeError,
)
from .utils import update_session_configure
__all__ = ['OAuth2Session', 'OAuth2Auth']
class OAuth2Auth(AuthBase, TokenAuth):
"""Sign requests for OAuth 2.0, currently only bearer token is supported."""
def ensure_active_token(self):
if self.client and not self.client.ensure_active_token(self.token):
raise InvalidTokenError()
def __call__(self, req):
self.ensure_active_token()
try:
req.url, req.headers, req.body = self.prepare(
req.url, req.headers, req.body)
except KeyError as error:
description = 'Unsupported token_type: {}'.format(str(error))
raise UnsupportedTokenTypeError(description=description)
return req
class OAuth2ClientAuth(AuthBase, ClientAuth):
"""Attaches OAuth Client Authentication to the given Request object.
"""
def __call__(self, req):
req.url, req.headers, req.body = self.prepare(
req.method, req.url, req.headers, req.body
)
return req
class OAuth2Session(OAuth2Client, Session):
"""Construct a new OAuth 2 client requests session.
:param client_id: Client ID, which you get from client registration.
:param client_secret: Client Secret, which you get from registration.
:param authorization_endpoint: URL of the authorization server's
authorization endpoint.
:param token_endpoint: URL of the authorization server's token endpoint.
:param token_endpoint_auth_method: client authentication method for
token endpoint.
:param revocation_endpoint: URL of the authorization server's OAuth 2.0
revocation endpoint.
:param revocation_endpoint_auth_method: client authentication method for
revocation endpoint.
:param scope: Scope that you needed to access user resources.
:param redirect_uri: Redirect URI you registered as callback.
:param token: A dict of token attributes such as ``access_token``,
``token_type`` and ``expires_at``.
:param token_placement: The place to put token in HTTP request. Available
values: "header", "body", "uri".
:param update_token: A function for you to update token. It accept a
:class:`OAuth2Token` as parameter.
"""
client_auth_class = OAuth2ClientAuth
token_auth_class = OAuth2Auth
SESSION_REQUEST_PARAMS = (
'allow_redirects', 'timeout', 'cookies', 'files',
'proxies', 'hooks', 'stream', 'verify', 'cert', 'json'
)
def __init__(self, client_id=None, client_secret=None,
token_endpoint_auth_method=None,
revocation_endpoint_auth_method=None,
scope=None, redirect_uri=None,
token=None, token_placement='header',
update_token=None, **kwargs):
Session.__init__(self)
update_session_configure(self, kwargs)
OAuth2Client.__init__(
self, session=self,
client_id=client_id, client_secret=client_secret,
token_endpoint_auth_method=token_endpoint_auth_method,
revocation_endpoint_auth_method=revocation_endpoint_auth_method,
scope=scope, redirect_uri=redirect_uri,
token=token, token_placement=token_placement,
update_token=update_token, **kwargs
)
def fetch_access_token(self, url=None, **kwargs):
"""Alias for fetch_token."""
return self.fetch_token(url, **kwargs)
def request(self, method, url, withhold_token=False, auth=None, **kwargs):
"""Send request with auto refresh token feature (if available)."""
if not withhold_token and auth is None:
if not self.token:
raise MissingTokenError()
auth = self.token_auth
return super(OAuth2Session, self).request(
method, url, auth=auth, **kwargs)
@staticmethod
def handle_error(error_type, error_description):
raise OAuthError(error_type, error_description)
以下是与验证相关的声明:
REGISTERED_CLAIMS = ['iss', 'sub', 'aud', 'exp', 'nbf', 'iat', 'jti']
REGISTERED_CLAIMS = [
'redirect_uris',
'token_endpoint_auth_method',
'grant_types',
'response_types',
'client_name',
'client_uri',
'logo_uri',
'scope',
'contacts',
'tos_uri',
'policy_uri',
'jwks_uri',
'jwks',
'software_id',
'software_version',
]
在 OAuth/OpenID 过程的每个阶段 authlib 正在执行多项检查以验证令牌。
希望我的回答能帮助您更好地理解函数authorize_access_token()。
编码愉快!!
为了检索 OAuth / OpenID Connect 令牌,在 authlib docs 中使用了函数 authorize_access_token
。 Google 等 OAuth 提供商强烈建议手动验证这些令牌,例如通过检查到期日期。
authorize_access_token
的文档在哪里?我在网站上找不到任何东西。该函数是自动验证令牌还是我必须自己验证?
Authlib tracks the token expiration date when it acquires the key in authorize_access_token
,但它不会立即测试密钥是否过期。 (它不希望提取过期的密钥!)
您是正确的,authlib 的文档对函数 authorize_access_token() 的作用很简单。在文档的 Web OAuth Client 选择中有一些指示,但该部分仅提及 authorize_access_token() 的 return 值。
# this only returns a token
token = oauth.providername.authorize_access_token()
为了充分理解 authorize_access_token() 的功能,我们需要探索 authlib 的 GitHub存储库。
这是来自 django 客户端代码的一段代码,它发出 OAuth 2 请求。
def authorize_access_token(self, request, **kwargs):
"""Fetch access token in one step.
:param request: HTTP request instance from Django view.
:return: A token dict.
"""
if request.method == 'GET':
error = request.GET.get('error')
if error:
description = request.GET.get('error_description')
raise OAuthError(error=error, description=description)
params = {
'code': request.GET.get('code'),
'state': request.GET.get('state'),
}
else:
params = {
'code': request.POST.get('code'),
'state': request.POST.get('state'),
}
state_data = self.framework.get_state_data(request.session, params.get('state'))
params = self._format_state_params(state_data, params)
token = self.fetch_access_token(**params, **kwargs)
if 'id_token' in token and 'nonce' in state_data:
userinfo = self.parse_id_token(token, nonce=state_data['nonce'])
token['userinfo'] = userinfo
return token
在上面的 authorize_access_token() 代码中我们有这个调用:
error = request.GET.get('error')
访问base_client代码段的调用。其中有这些验证:
- 不匹配状态错误
- MissingRequestTokenError
- MissingTokenError
base_client代码中的一个子调用是这样的:
def _on_update_token(self, token, refresh_token=None, access_token=None):
raise NotImplementedError()
如果出现问题,authorize_access_token() 将抛出错误:
if error:
description = request.GET.get('error_description')
raise OAuthError(error=error, description=description)
authorize_access_token() 函数中的每个调用也使用 base_client[= 进行各种类型的检查85=] 代码,内容广泛。
state_data = self.framework.get_state_data(request.session, params.get('state'))
params = self._format_state_params(state_data, params)
token = self.fetch_access_token(**params, **kwargs)
第一次调用state_data调用这个函数:
def get_state_data(self, session, state):
key = f'_state_{self.name}_{state}'
if self.cache:
value = self._get_cache_data(key)
else:
value = session.get(key)
if value:
return value.get('data')
return None
第二次调用params调用这个函数:
def _format_state_params(state_data, params):
if state_data is None:
raise MismatchingStateError()
code_verifier = state_data.get('code_verifier')
if code_verifier:
params['code_verifier'] = code_verifier
redirect_uri = state_data.get('redirect_uri')
if redirect_uri:
params['redirect_uri'] = redirect_uri
return params
第三次调用token调用这个函数:
def fetch_access_token(self, request_token=None, **kwargs):
"""Fetch access token in one step.
:param request_token: A previous request token for OAuth 1.
:param kwargs: Extra parameters to fetch access token.
:return: A token dict.
"""
with self._get_oauth_client() as client:
if request_token is None:
raise MissingRequestTokenError()
# merge request token with verifier
token = {}
token.update(request_token)
token.update(kwargs)
client.token = token
params = self.access_token_params or {}
token = client.fetch_access_token(self.access_token_url, **params)
return token
authorize_access_token()函数中的最后调用是 parse_id_token,在base_client代码中调用此函数:
async def parse_id_token(self, token, nonce, claims_options=None):
"""Return an instance of UserInfo from token's ``id_token``."""
claims_params = dict(
nonce=nonce,
client_id=self.client_id,
)
if 'access_token' in token:
claims_params['access_token'] = token['access_token']
claims_cls = CodeIDToken
else:
claims_cls = ImplicitIDToken
metadata = await self.load_server_metadata()
if claims_options is None and 'issuer' in metadata:
claims_options = {'iss': {'values': [metadata['issuer']]}}
alg_values = metadata.get('id_token_signing_alg_values_supported')
if not alg_values:
alg_values = ['RS256']
jwt = JsonWebToken(alg_values)
jwk_set = await self.fetch_jwk_set()
try:
claims = jwt.decode(
token['id_token'],
key=JsonWebKey.import_key_set(jwk_set),
claims_cls=claims_cls,
claims_options=claims_options,
claims_params=claims_params,
)
except ValueError:
jwk_set = await self.fetch_jwk_set(force=True)
claims = jwt.decode(
token['id_token'],
key=JsonWebKey.import_key_set(jwk_set),
claims_cls=claims_cls,
claims_options=claims_options,
claims_params=claims_params,
)
# https://github.com/lepture/authlib/issues/259
if claims.get('nonce_supported') is False:
claims.params['nonce'] = None
claims.validate(leeway=120)
return UserInfo(claims)
在上述所有函数中,在 base_client 代码中调用了其他函数。例如,这里是从 authorize_access_token() 函数访问的代码的另一部分:
from requests import Session
from requests.auth import AuthBase
from authlib.oauth2.client import OAuth2Client
from authlib.oauth2.auth import ClientAuth, TokenAuth
from ..base_client import (
OAuthError,
InvalidTokenError,
MissingTokenError,
UnsupportedTokenTypeError,
)
from .utils import update_session_configure
__all__ = ['OAuth2Session', 'OAuth2Auth']
class OAuth2Auth(AuthBase, TokenAuth):
"""Sign requests for OAuth 2.0, currently only bearer token is supported."""
def ensure_active_token(self):
if self.client and not self.client.ensure_active_token(self.token):
raise InvalidTokenError()
def __call__(self, req):
self.ensure_active_token()
try:
req.url, req.headers, req.body = self.prepare(
req.url, req.headers, req.body)
except KeyError as error:
description = 'Unsupported token_type: {}'.format(str(error))
raise UnsupportedTokenTypeError(description=description)
return req
class OAuth2ClientAuth(AuthBase, ClientAuth):
"""Attaches OAuth Client Authentication to the given Request object.
"""
def __call__(self, req):
req.url, req.headers, req.body = self.prepare(
req.method, req.url, req.headers, req.body
)
return req
class OAuth2Session(OAuth2Client, Session):
"""Construct a new OAuth 2 client requests session.
:param client_id: Client ID, which you get from client registration.
:param client_secret: Client Secret, which you get from registration.
:param authorization_endpoint: URL of the authorization server's
authorization endpoint.
:param token_endpoint: URL of the authorization server's token endpoint.
:param token_endpoint_auth_method: client authentication method for
token endpoint.
:param revocation_endpoint: URL of the authorization server's OAuth 2.0
revocation endpoint.
:param revocation_endpoint_auth_method: client authentication method for
revocation endpoint.
:param scope: Scope that you needed to access user resources.
:param redirect_uri: Redirect URI you registered as callback.
:param token: A dict of token attributes such as ``access_token``,
``token_type`` and ``expires_at``.
:param token_placement: The place to put token in HTTP request. Available
values: "header", "body", "uri".
:param update_token: A function for you to update token. It accept a
:class:`OAuth2Token` as parameter.
"""
client_auth_class = OAuth2ClientAuth
token_auth_class = OAuth2Auth
SESSION_REQUEST_PARAMS = (
'allow_redirects', 'timeout', 'cookies', 'files',
'proxies', 'hooks', 'stream', 'verify', 'cert', 'json'
)
def __init__(self, client_id=None, client_secret=None,
token_endpoint_auth_method=None,
revocation_endpoint_auth_method=None,
scope=None, redirect_uri=None,
token=None, token_placement='header',
update_token=None, **kwargs):
Session.__init__(self)
update_session_configure(self, kwargs)
OAuth2Client.__init__(
self, session=self,
client_id=client_id, client_secret=client_secret,
token_endpoint_auth_method=token_endpoint_auth_method,
revocation_endpoint_auth_method=revocation_endpoint_auth_method,
scope=scope, redirect_uri=redirect_uri,
token=token, token_placement=token_placement,
update_token=update_token, **kwargs
)
def fetch_access_token(self, url=None, **kwargs):
"""Alias for fetch_token."""
return self.fetch_token(url, **kwargs)
def request(self, method, url, withhold_token=False, auth=None, **kwargs):
"""Send request with auto refresh token feature (if available)."""
if not withhold_token and auth is None:
if not self.token:
raise MissingTokenError()
auth = self.token_auth
return super(OAuth2Session, self).request(
method, url, auth=auth, **kwargs)
@staticmethod
def handle_error(error_type, error_description):
raise OAuthError(error_type, error_description)
以下是与验证相关的声明:
REGISTERED_CLAIMS = ['iss', 'sub', 'aud', 'exp', 'nbf', 'iat', 'jti']
REGISTERED_CLAIMS = [
'redirect_uris',
'token_endpoint_auth_method',
'grant_types',
'response_types',
'client_name',
'client_uri',
'logo_uri',
'scope',
'contacts',
'tos_uri',
'policy_uri',
'jwks_uri',
'jwks',
'software_id',
'software_version',
]
在 OAuth/OpenID 过程的每个阶段 authlib 正在执行多项检查以验证令牌。
希望我的回答能帮助您更好地理解函数authorize_access_token()。
编码愉快!!