如何使用 REST API 从服务帐户凭据创建访问令牌?
How do I create an Access Token from Service Account Credentials using REST API?
我在Google Cloud Platform 中创建了一个服务帐户并下载了JSON 格式的私钥。我正在尝试通过 REST API 创建计算资源。出于身份验证目的,我需要一个 AccessToken,它需要设置为创建计算资源 REST API 的 Header。是否有 REST API 从私钥获取访问令牌(不使用 SDK 或 Google 客户端)?
以下示例向您展示了在 Python 中不使用 SDK 调用 Google 云 API 的几个重要步骤。类似的代码几乎适用于任何语言(c#、java、php、nodejs)。
使用您的服务帐户 Json 文件的文件名、您的 Google 区域和您的项目 ID 更改源代码。
此示例将列出指定项目在一个区域中的实例。从这个例子你会知道框架调用一个API来创建GCE实例。
此代码将向您展示如何:
- 如何从 Json 文件加载服务帐户凭据。
- 如何提取用于签署请求的私钥。
- 如何为 Google Oauth 2.0 创建 JWT(Json Web 令牌)。
- 如何设置 Google 范围(权限)。
- 如何签署 JWT 以创建 Signed-JWT (JWS)。
- 如何用 Signed-JWT 交换 Google OAuth 2.0 访问令牌。
- 如何设置过期时间。此程序默认为 3600 秒(1 小时)。
- 如何调用 Google API 并设置授权 Header。
- 如何处理返回的Json结果并显示每个实例的名称。
Python中的示例程序3.x:
'''
This program lists lists the Google Compute Engine Instances in one zone
'''
# Author: John Hanley
# https://www.jhanley.com
import time
import json
import jwt
import requests
import httplib2
# Project ID for this request.
project = 'development-123456'
# The name of the zone for this request.
zone = 'us-west1-a'
# Service Account Credentials, Json format
json_filename = 'service-account.json'
# Permissions to request for Access Token
scopes = "https://www.googleapis.com/auth/cloud-platform"
# Set how long this token will be valid in seconds
expires_in = 3600 # Expires in 1 hour
def load_json_credentials(filename):
''' Load the Google Service Account Credentials from Json file '''
with open(filename, 'r') as f:
data = f.read()
return json.loads(data)
def load_private_key(json_cred):
''' Return the private key from the json credentials '''
return json_cred['private_key']
def create_signed_jwt(pkey, pkey_id, email, scope):
''' Create a Signed JWT from a service account Json credentials file
This Signed JWT will later be exchanged for an Access Token '''
# Google Endpoint for creating OAuth 2.0 Access Tokens from Signed-JWT
auth_url = "https://www.googleapis.com/oauth2/v4/token"
issued = int(time.time())
expires = issued + expires_in # expires_in is in seconds
# Note: this token expires and cannot be refreshed. The token must be recreated
# JWT Headers
additional_headers = {
'kid': pkey_id,
"alg": "RS256",
"typ": "JWT" # Google uses SHA256withRSA
}
# JWT Payload
payload = {
"iss": email, # Issuer claim
"sub": email, # Issuer claim
"aud": auth_url, # Audience claim
"iat": issued, # Issued At claim
"exp": expires, # Expire time
"scope": scope # Permissions
}
# Encode the headers and payload and sign creating a Signed JWT (JWS)
sig = jwt.encode(payload, pkey, algorithm="RS256", headers=additional_headers)
return sig
def exchangeJwtForAccessToken(signed_jwt):
'''
This function takes a Signed JWT and exchanges it for a Google OAuth Access Token
'''
auth_url = "https://www.googleapis.com/oauth2/v4/token"
params = {
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"assertion": signed_jwt
}
r = requests.post(auth_url, data=params)
if r.ok:
return(r.json()['access_token'], '')
return None, r.text
def gce_list_instances(accessToken):
'''
This functions lists the Google Compute Engine Instances in one zone
'''
# Endpoint that we will call
url = "https://www.googleapis.com/compute/v1/projects/" + project + "/zones/" + zone + "/instances"
# One of the headers is "Authorization: Bearer $TOKEN"
headers = {
"Host": "www.googleapis.com",
"Authorization": "Bearer " + accessToken,
"Content-Type": "application/json"
}
h = httplib2.Http()
resp, content = h.request(uri=url, method="GET", headers=headers)
status = int(resp.status)
if status < 200 or status >= 300:
print('Error: HTTP Request failed')
return
j = json.loads(content.decode('utf-8').replace('\n', ''))
print('Compute instances in zone', zone)
print('------------------------------------------------------------')
for item in j['items']:
print(item['name'])
if __name__ == '__main__':
cred = load_json_credentials(json_filename)
private_key = load_private_key(cred)
s_jwt = create_signed_jwt(
private_key,
cred['private_key_id'],
cred['client_email'],
scopes)
token, err = exchangeJwtForAccessToken(s_jwt)
if token is None:
print('Error:', err)
exit(1)
gce_list_instances(token)
有关详细信息,请访问我的博客。我写这样的文章并发布源代码,以帮助其他人了解如何为云编写软件。
注意:如评论中所述,这实际上不是问题的解决方案,因为它使用 SDK。无论如何,由于答案对其他用户有用,我没有删除它
有一种更简单的方法可以从服务帐户生成令牌,使用 Google 库
from google.auth.transport import requests
from google.oauth2 import service_account
CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
CREDENTIALS_KEY_PATH = '/PATH/TO/SERVICE_ACCOUNT.json'
def get_service_account_token():
credentials = service_account.Credentials.from_service_account_file(
CREDENTIALS_KEY_PATH, scopes=CREDENTIAL_SCOPES)
credentials.refresh(requests.Request())
return credentials.token
或者如果您想使用默认身份验证
import google
from google.auth.transport import requests
CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
def get_default_token():
credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)
credentials.refresh(requests.Request())
return credentials.token
创建credentialsobject时,token为空,但刷新credentials后,包含了access token可以在 API 请求
中用作 header
相同的解决方案使用 JAVA
import com.google.auth.oauth2.GoogleCredentials;
import java.io.FileInputStream;
import java.io.IOException;
public class GoogleHelper {
public static String getAccessToken() throws IOException {
return GoogleCredentials
.fromStream(new FileInputStream("/PATH/TO/SERVICE_ACCOUNT.json"))
.createScoped("https://www.googleapis.com/auth/cloud-platform")
.refreshAccessToken()
.getTokenValue();
}
}
我在Google Cloud Platform 中创建了一个服务帐户并下载了JSON 格式的私钥。我正在尝试通过 REST API 创建计算资源。出于身份验证目的,我需要一个 AccessToken,它需要设置为创建计算资源 REST API 的 Header。是否有 REST API 从私钥获取访问令牌(不使用 SDK 或 Google 客户端)?
以下示例向您展示了在 Python 中不使用 SDK 调用 Google 云 API 的几个重要步骤。类似的代码几乎适用于任何语言(c#、java、php、nodejs)。
使用您的服务帐户 Json 文件的文件名、您的 Google 区域和您的项目 ID 更改源代码。
此示例将列出指定项目在一个区域中的实例。从这个例子你会知道框架调用一个API来创建GCE实例。
此代码将向您展示如何:
- 如何从 Json 文件加载服务帐户凭据。
- 如何提取用于签署请求的私钥。
- 如何为 Google Oauth 2.0 创建 JWT(Json Web 令牌)。
- 如何设置 Google 范围(权限)。
- 如何签署 JWT 以创建 Signed-JWT (JWS)。
- 如何用 Signed-JWT 交换 Google OAuth 2.0 访问令牌。
- 如何设置过期时间。此程序默认为 3600 秒(1 小时)。
- 如何调用 Google API 并设置授权 Header。
- 如何处理返回的Json结果并显示每个实例的名称。
Python中的示例程序3.x:
'''
This program lists lists the Google Compute Engine Instances in one zone
'''
# Author: John Hanley
# https://www.jhanley.com
import time
import json
import jwt
import requests
import httplib2
# Project ID for this request.
project = 'development-123456'
# The name of the zone for this request.
zone = 'us-west1-a'
# Service Account Credentials, Json format
json_filename = 'service-account.json'
# Permissions to request for Access Token
scopes = "https://www.googleapis.com/auth/cloud-platform"
# Set how long this token will be valid in seconds
expires_in = 3600 # Expires in 1 hour
def load_json_credentials(filename):
''' Load the Google Service Account Credentials from Json file '''
with open(filename, 'r') as f:
data = f.read()
return json.loads(data)
def load_private_key(json_cred):
''' Return the private key from the json credentials '''
return json_cred['private_key']
def create_signed_jwt(pkey, pkey_id, email, scope):
''' Create a Signed JWT from a service account Json credentials file
This Signed JWT will later be exchanged for an Access Token '''
# Google Endpoint for creating OAuth 2.0 Access Tokens from Signed-JWT
auth_url = "https://www.googleapis.com/oauth2/v4/token"
issued = int(time.time())
expires = issued + expires_in # expires_in is in seconds
# Note: this token expires and cannot be refreshed. The token must be recreated
# JWT Headers
additional_headers = {
'kid': pkey_id,
"alg": "RS256",
"typ": "JWT" # Google uses SHA256withRSA
}
# JWT Payload
payload = {
"iss": email, # Issuer claim
"sub": email, # Issuer claim
"aud": auth_url, # Audience claim
"iat": issued, # Issued At claim
"exp": expires, # Expire time
"scope": scope # Permissions
}
# Encode the headers and payload and sign creating a Signed JWT (JWS)
sig = jwt.encode(payload, pkey, algorithm="RS256", headers=additional_headers)
return sig
def exchangeJwtForAccessToken(signed_jwt):
'''
This function takes a Signed JWT and exchanges it for a Google OAuth Access Token
'''
auth_url = "https://www.googleapis.com/oauth2/v4/token"
params = {
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"assertion": signed_jwt
}
r = requests.post(auth_url, data=params)
if r.ok:
return(r.json()['access_token'], '')
return None, r.text
def gce_list_instances(accessToken):
'''
This functions lists the Google Compute Engine Instances in one zone
'''
# Endpoint that we will call
url = "https://www.googleapis.com/compute/v1/projects/" + project + "/zones/" + zone + "/instances"
# One of the headers is "Authorization: Bearer $TOKEN"
headers = {
"Host": "www.googleapis.com",
"Authorization": "Bearer " + accessToken,
"Content-Type": "application/json"
}
h = httplib2.Http()
resp, content = h.request(uri=url, method="GET", headers=headers)
status = int(resp.status)
if status < 200 or status >= 300:
print('Error: HTTP Request failed')
return
j = json.loads(content.decode('utf-8').replace('\n', ''))
print('Compute instances in zone', zone)
print('------------------------------------------------------------')
for item in j['items']:
print(item['name'])
if __name__ == '__main__':
cred = load_json_credentials(json_filename)
private_key = load_private_key(cred)
s_jwt = create_signed_jwt(
private_key,
cred['private_key_id'],
cred['client_email'],
scopes)
token, err = exchangeJwtForAccessToken(s_jwt)
if token is None:
print('Error:', err)
exit(1)
gce_list_instances(token)
有关详细信息,请访问我的博客。我写这样的文章并发布源代码,以帮助其他人了解如何为云编写软件。
注意:如评论中所述,这实际上不是问题的解决方案,因为它使用 SDK。无论如何,由于答案对其他用户有用,我没有删除它
有一种更简单的方法可以从服务帐户生成令牌,使用 Google 库
from google.auth.transport import requests
from google.oauth2 import service_account
CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
CREDENTIALS_KEY_PATH = '/PATH/TO/SERVICE_ACCOUNT.json'
def get_service_account_token():
credentials = service_account.Credentials.from_service_account_file(
CREDENTIALS_KEY_PATH, scopes=CREDENTIAL_SCOPES)
credentials.refresh(requests.Request())
return credentials.token
或者如果您想使用默认身份验证
import google
from google.auth.transport import requests
CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
def get_default_token():
credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)
credentials.refresh(requests.Request())
return credentials.token
创建credentialsobject时,token为空,但刷新credentials后,包含了access token可以在 API 请求
中用作 header相同的解决方案使用 JAVA
import com.google.auth.oauth2.GoogleCredentials;
import java.io.FileInputStream;
import java.io.IOException;
public class GoogleHelper {
public static String getAccessToken() throws IOException {
return GoogleCredentials
.fromStream(new FileInputStream("/PATH/TO/SERVICE_ACCOUNT.json"))
.createScoped("https://www.googleapis.com/auth/cloud-platform")
.refreshAccessToken()
.getTokenValue();
}
}