Python 脚本,用于通过 REST API 在 Google Cloud Platform 中更新 Cloud Armor 中的规则
The Python script for updating rules in the Cloud Armor in Google Cloud Platform by REST API
我想编写一个 Python 脚本来更新或添加新规则到 Cloud Armor in Google Cloud Platform using the Compute Engine REST API。
但是我有几个疑惑:
官方Google Cloud Client Library for Python适合这个目的吗(如果不是你提议的)?
为此我应该安装哪个 Python package?
这个standard authentication(JSON文件是否包含我的私钥并正确设置环境变量GOOGLE_APPLICATION_CREDENTIALS
) 足以连接到此 API 以达到我想要达到的目的?
我在问路
Python 脚本是个不错的选择,您可以使用 Google APIs Python client library 来实现。
Python 的 Google 云客户端库可能不够用,但 Google 云客户端库是调用 Google 云的最新和推荐的客户端库API秒。
Compute Engine v1 securityPolicies
REST API resource 提供了您需要的方法列表,例如 addRule
、getRule
和 patchRule
添加新的规则,分别检索和更新现有规则。
查看 PyDoc reference for the Compute Engine API 以获得完整的方法列表以及如何使用它们的说明。
此外,Python Package(Google API Python 客户端库)是您需要使用的。
您提到的 standard authentication 提供了一种有用的方法,用于通过 Python 客户端库授权对计算引擎 API 的请求。
根据 official doc:GCP 客户端库使用一种称为应用程序默认凭据 (ADC) 的策略来查找应用程序的凭据。当您的代码使用客户端库时,该策略会按以下顺序检查您的凭据:
- First, ADC checks to see if the environment variable
GOOGLE_APPLICATION_CREDENTIALS is set. If the variable is set, ADC
uses the service account file that the variable points to. The next
section describes how to set the environment variable.
- If the environment variable isn't set, ADC uses the default service
account that Compute Engine, Kubernetes Engine, App Engine, and
Cloud Functions provide, for applications that run on those
services.
- If ADC can't use either of the above credentials, an error occurs.
最后,确保您选择使用的用户帐户具有在计算引擎上配置 Cloud Armor 所需的正确IAM permissions。
import googleapiclient.discovery
from google.oauth2 import service_account
from pprint import pprint
import logging
if LOCAL == 'True':
credentials = service_account.Credentials.from_service_account_file(
'path-to-service-account.json')
compute = googleapiclient.discovery.build('compute',
'v1',
credentials=credentials)
else:
compute = googleapiclient.discovery.build('compute',
'v1')
class cloud_armor():
def __init__(self, domain):
self.domain = domain
self.project_id = <PROJECT_ID>
self.policy_name = <POLICY_NAME>
def add_rule(self):
#Find minimum current priority
current_policy = cloud_armor.get_policy(self)
current_rules = current_policy['rules']
rule_priorities = []
for rule in current_rules:
rule_priorities.append(rule['priority'])
priority = int(min(rule_priorities)) - 1
body = {
"description": "{}".format(self.domain),
"priority": priority,
"match": {
"expr": {
"expression": "request.headers['referer']=="{}"".format(
self.domain)
}
},
"action": "allow",
"preview": False,
"kind": "compute#securityPolicy"
}
try:
policies = compute.securityPolicies()
rule = policies.addRule(project=self.project_id,
securityPolicy=self.policy_name,
body=body
).execute()
return rule
except Exception as err:
for i in range(0, len(err.args)):
logging.error(err.args[i])
pprint(err.args[i])
print("==Policy Rule Failed to Add==")
raise Exception("Policy Rule Failed to Add")
def get_policy(self):
try:
policies = compute.securityPolicies()
policy = policies.get(project=self.project_id,
securityPolicy=self.policy_name
).execute()
return policy
except Exception as err:
for i in range(err.args):
logging.error(err.args[i])
pprint(err.args[i])
print("==Failed to Fetch Policy==")
raise Exception("Failed to Fetch Policy")
我想编写一个 Python 脚本来更新或添加新规则到 Cloud Armor in Google Cloud Platform using the Compute Engine REST API。
但是我有几个疑惑:
官方Google Cloud Client Library for Python适合这个目的吗(如果不是你提议的)?
为此我应该安装哪个 Python package?
这个standard authentication(JSON文件是否包含我的私钥并正确设置环境变量
GOOGLE_APPLICATION_CREDENTIALS
) 足以连接到此 API 以达到我想要达到的目的?
我在问路
Python 脚本是个不错的选择,您可以使用 Google APIs Python client library 来实现。
Python 的 Google 云客户端库可能不够用,但 Google 云客户端库是调用 Google 云的最新和推荐的客户端库API秒。
Compute Engine v1 securityPolicies
REST API resource 提供了您需要的方法列表,例如 addRule
、getRule
和 patchRule
添加新的规则,分别检索和更新现有规则。
查看 PyDoc reference for the Compute Engine API 以获得完整的方法列表以及如何使用它们的说明。
此外,Python Package(Google API Python 客户端库)是您需要使用的。
您提到的 standard authentication 提供了一种有用的方法,用于通过 Python 客户端库授权对计算引擎 API 的请求。
根据 official doc:GCP 客户端库使用一种称为应用程序默认凭据 (ADC) 的策略来查找应用程序的凭据。当您的代码使用客户端库时,该策略会按以下顺序检查您的凭据:
- First, ADC checks to see if the environment variable GOOGLE_APPLICATION_CREDENTIALS is set. If the variable is set, ADC uses the service account file that the variable points to. The next section describes how to set the environment variable.
- If the environment variable isn't set, ADC uses the default service account that Compute Engine, Kubernetes Engine, App Engine, and Cloud Functions provide, for applications that run on those services.
- If ADC can't use either of the above credentials, an error occurs.
最后,确保您选择使用的用户帐户具有在计算引擎上配置 Cloud Armor 所需的正确IAM permissions。
import googleapiclient.discovery
from google.oauth2 import service_account
from pprint import pprint
import logging
if LOCAL == 'True':
credentials = service_account.Credentials.from_service_account_file(
'path-to-service-account.json')
compute = googleapiclient.discovery.build('compute',
'v1',
credentials=credentials)
else:
compute = googleapiclient.discovery.build('compute',
'v1')
class cloud_armor():
def __init__(self, domain):
self.domain = domain
self.project_id = <PROJECT_ID>
self.policy_name = <POLICY_NAME>
def add_rule(self):
#Find minimum current priority
current_policy = cloud_armor.get_policy(self)
current_rules = current_policy['rules']
rule_priorities = []
for rule in current_rules:
rule_priorities.append(rule['priority'])
priority = int(min(rule_priorities)) - 1
body = {
"description": "{}".format(self.domain),
"priority": priority,
"match": {
"expr": {
"expression": "request.headers['referer']=="{}"".format(
self.domain)
}
},
"action": "allow",
"preview": False,
"kind": "compute#securityPolicy"
}
try:
policies = compute.securityPolicies()
rule = policies.addRule(project=self.project_id,
securityPolicy=self.policy_name,
body=body
).execute()
return rule
except Exception as err:
for i in range(0, len(err.args)):
logging.error(err.args[i])
pprint(err.args[i])
print("==Policy Rule Failed to Add==")
raise Exception("Policy Rule Failed to Add")
def get_policy(self):
try:
policies = compute.securityPolicies()
policy = policies.get(project=self.project_id,
securityPolicy=self.policy_name
).execute()
return policy
except Exception as err:
for i in range(err.args):
logging.error(err.args[i])
pprint(err.args[i])
print("==Failed to Fetch Policy==")
raise Exception("Failed to Fetch Policy")