Django Oauth Toolkit:自省用户数据
Django Oauth Toolkit: User data over introspection
当前情况:
我正在使用 Introspect 来验证身份验证服务器上的访问令牌。此调用仅 returns 来自身份验证服务器的用户 'username' 并将其保存在资源服务器中。同一用户在认证服务器和资源服务器上的Id不一定相同
期望的场景:
我想接收更多有关用户的数据(电子邮件、phone 号码、地址等)并将其保存在资源服务器中。
到目前为止我做了什么:
我将 django-oauth-toolkit/oauth2_provider/views/introspect.py/ get_token_response
修改为 return 我需要的数据。
还剩:
如何将这些数据保存在资源服务器中?还是在我需要用户数据时向身份验证服务器发出 api 调用更好?
我通过修改 Auth-Server
的 IntrospectTokenView 中的 get_token_response 实现了这一点
def get_token_response(token_value=None):
try:
token = get_access_token_model().objects.select_related(
"user", "application"
).get(token=token_value)
except ObjectDoesNotExist:
return HttpResponse(
content=json.dumps({"active": False}),
status=401,
content_type="application/json"
)
else:
if token.is_valid():
data = {
"active": True,
"scope": token.scope,
"exp": int(calendar.timegm(token.expires.timetuple())),
}
if token.application:
data["client_id"] = token.application.client_id
if token.user:
data["username"] = token.user.get_username()
# TODO: DANGER ZONE
# Pass extra parameters
# ------------------------------------------------------------------------------
data["email"] = token.user.email
data["phone_number"] = token.user.phone_number
data["is_company"] = token.user.is_company
customer = token.user.customer
data["designation"] = customer.designation
company = customer.company
data["company"] = company.company_name
# ------------------------------------------------------------------------------
return HttpResponse(content=json.dumps(data), status=200, content_type="application/json")
else:
return HttpResponse(content=json.dumps({
"active": False,
}), status=200, content_type="application/json")
资源服务器中的 OAuth2Validator 中的 和 _get_token_from_authentication_server
def _get_token_from_authentication_server(
self, token, introspection_url, introspection_token, introspection_credentials
):
headers = None
if introspection_token:
headers = {"Authorization": "Bearer {}".format(introspection_token)}
elif introspection_credentials:
client_id = introspection_credentials[0].encode("utf-8")
client_secret = introspection_credentials[1].encode("utf-8")
basic_auth = base64.b64encode(client_id + b":" + client_secret)
headers = {"Authorization": "Basic {}".format(basic_auth.decode("utf-8"))}
try:
response = requests.post(
introspection_url,
data={"token": token}, headers=headers
)
except requests.exceptions.RequestException:
log.exception("Introspection: Failed POST to %r in token lookup", introspection_url)
return None
# Log an exception when response from auth server is not successful
if response.status_code != http.client.OK:
log.exception("Introspection: Failed to get a valid response "
"from authentication server. Status code: {}, "
"Reason: {}.".format(response.status_code,
response.reason))
return None
try:
content = response.json()
except ValueError:
log.exception("Introspection: Failed to parse response as json")
return None
if "active" in content and content["active"] is True:
if "username" in content:
user, _created = UserModel.objects.get_or_create(
**{UserModel.USERNAME_FIELD: content["username"]}
)
# TODO: DANGER ZONE
# Adding extra data to user profile and create company
# ------------------------------------------------------------------------------
user.email = content["email"]
user.phone_number = content["phone_number"]
user.is_company = content["is_company"]
customer, _created_customer = CustomerModel.objects.get_or_create(
user = user
)
customer.designation = content["designation"]
company, _created_company = CompanyModel.objects.get_or_create(
company_name = content["company"]
)
customer.company = company
customer.save()
user.save()
# ------------------------------------------------------------------------------
else:
user = None
max_caching_time = datetime.now() + timedelta(
seconds=oauth2_settings.RESOURCE_SERVER_TOKEN_CACHING_SECONDS
)
if "exp" in content:
expires = datetime.utcfromtimestamp(content["exp"])
if expires > max_caching_time:
expires = max_caching_time
else:
expires = max_caching_time
scope = content.get("scope", "")
expires = make_aware(expires)
access_token, _created = AccessToken.objects.update_or_create(
token=token,
defaults={
"user": user,
"application": None,
"scope": scope,
"expires": expires,
})
return access_token
。
现在我想知道如何扩展 类 并添加额外的代码而不是直接修改源代码?感谢任何帮助。
当前情况:
我正在使用 Introspect 来验证身份验证服务器上的访问令牌。此调用仅 returns 来自身份验证服务器的用户 'username' 并将其保存在资源服务器中。同一用户在认证服务器和资源服务器上的Id不一定相同
期望的场景:
我想接收更多有关用户的数据(电子邮件、phone 号码、地址等)并将其保存在资源服务器中。
到目前为止我做了什么:
我将 django-oauth-toolkit/oauth2_provider/views/introspect.py/ get_token_response
修改为 return 我需要的数据。
还剩:
如何将这些数据保存在资源服务器中?还是在我需要用户数据时向身份验证服务器发出 api 调用更好?
我通过修改 Auth-Server
的 IntrospectTokenView 中的 get_token_response 实现了这一点def get_token_response(token_value=None):
try:
token = get_access_token_model().objects.select_related(
"user", "application"
).get(token=token_value)
except ObjectDoesNotExist:
return HttpResponse(
content=json.dumps({"active": False}),
status=401,
content_type="application/json"
)
else:
if token.is_valid():
data = {
"active": True,
"scope": token.scope,
"exp": int(calendar.timegm(token.expires.timetuple())),
}
if token.application:
data["client_id"] = token.application.client_id
if token.user:
data["username"] = token.user.get_username()
# TODO: DANGER ZONE
# Pass extra parameters
# ------------------------------------------------------------------------------
data["email"] = token.user.email
data["phone_number"] = token.user.phone_number
data["is_company"] = token.user.is_company
customer = token.user.customer
data["designation"] = customer.designation
company = customer.company
data["company"] = company.company_name
# ------------------------------------------------------------------------------
return HttpResponse(content=json.dumps(data), status=200, content_type="application/json")
else:
return HttpResponse(content=json.dumps({
"active": False,
}), status=200, content_type="application/json")
资源服务器中的 OAuth2Validator 中的 和 _get_token_from_authentication_server
def _get_token_from_authentication_server(
self, token, introspection_url, introspection_token, introspection_credentials
):
headers = None
if introspection_token:
headers = {"Authorization": "Bearer {}".format(introspection_token)}
elif introspection_credentials:
client_id = introspection_credentials[0].encode("utf-8")
client_secret = introspection_credentials[1].encode("utf-8")
basic_auth = base64.b64encode(client_id + b":" + client_secret)
headers = {"Authorization": "Basic {}".format(basic_auth.decode("utf-8"))}
try:
response = requests.post(
introspection_url,
data={"token": token}, headers=headers
)
except requests.exceptions.RequestException:
log.exception("Introspection: Failed POST to %r in token lookup", introspection_url)
return None
# Log an exception when response from auth server is not successful
if response.status_code != http.client.OK:
log.exception("Introspection: Failed to get a valid response "
"from authentication server. Status code: {}, "
"Reason: {}.".format(response.status_code,
response.reason))
return None
try:
content = response.json()
except ValueError:
log.exception("Introspection: Failed to parse response as json")
return None
if "active" in content and content["active"] is True:
if "username" in content:
user, _created = UserModel.objects.get_or_create(
**{UserModel.USERNAME_FIELD: content["username"]}
)
# TODO: DANGER ZONE
# Adding extra data to user profile and create company
# ------------------------------------------------------------------------------
user.email = content["email"]
user.phone_number = content["phone_number"]
user.is_company = content["is_company"]
customer, _created_customer = CustomerModel.objects.get_or_create(
user = user
)
customer.designation = content["designation"]
company, _created_company = CompanyModel.objects.get_or_create(
company_name = content["company"]
)
customer.company = company
customer.save()
user.save()
# ------------------------------------------------------------------------------
else:
user = None
max_caching_time = datetime.now() + timedelta(
seconds=oauth2_settings.RESOURCE_SERVER_TOKEN_CACHING_SECONDS
)
if "exp" in content:
expires = datetime.utcfromtimestamp(content["exp"])
if expires > max_caching_time:
expires = max_caching_time
else:
expires = max_caching_time
scope = content.get("scope", "")
expires = make_aware(expires)
access_token, _created = AccessToken.objects.update_or_create(
token=token,
defaults={
"user": user,
"application": None,
"scope": scope,
"expires": expires,
})
return access_token
。 现在我想知道如何扩展 类 并添加额外的代码而不是直接修改源代码?感谢任何帮助。