Python 脚本,用于通过 REST API 在 Google Cloud Platform 中更新 Cloud Armor 中的规则

The Python script for updating rules in the Cloud Armor in Google Cloud Platform by REST API

我想编写一个 Python 脚本来更新或添加新规则到 Cloud Armor in Google Cloud Platform using the Compute Engine REST API

但是我有几个疑惑:

我在问路

Python 脚本是个不错的选择,您可以使用 Google APIs Python client library 来实现。

Python 的 Google 云客户端库可能不够用,但 Google 云客户端库是调用 Google 云的最新和推荐的客户端库API秒。

Compute Engine v1 securityPolicies REST API resource 提供了您需要的方法列表,例如 addRulegetRulepatchRule 添加新的规则,分别检索和更新现有规则。

查看 PyDoc reference for the Compute Engine API 以获得完整的方法列表以及如何使用它们的说明。

此外,Python Package(Google API Python 客户端库)是您需要使用的。

您提到的 standard authentication 提供了一种有用的方法,用于通过 Python 客户端库授权对计算引擎 API 的请求。

根据 official doc:GCP 客户端库使用一种称为应用程序默认凭据 (ADC) 的策略来查找应用程序的凭据。当您的代码使用客户端库时,该策略会按以下顺序检查您的凭据:

  1. First, ADC checks to see if the environment variable GOOGLE_APPLICATION_CREDENTIALS is set. If the variable is set, ADC uses the service account file that the variable points to. The next section describes how to set the environment variable.
  2. If the environment variable isn't set, ADC uses the default service account that Compute Engine, Kubernetes Engine, App Engine, and Cloud Functions provide, for applications that run on those services.
  3. If ADC can't use either of the above credentials, an error occurs.

最后,确保您选择使用的用户帐户具有在计算引擎上配置 Cloud Armor 所需的正确IAM permissions


import googleapiclient.discovery
from google.oauth2 import service_account
from pprint import pprint
import logging

if LOCAL == 'True':
    credentials = service_account.Credentials.from_service_account_file(
        'path-to-service-account.json')
    compute = googleapiclient.discovery.build('compute',
                                              'v1',
                                              credentials=credentials)
else:
    compute = googleapiclient.discovery.build('compute',
                                              'v1')


class cloud_armor():

    def __init__(self, domain):
        self.domain = domain
        self.project_id = <PROJECT_ID>
        self.policy_name = <POLICY_NAME>

    def add_rule(self):
        #Find minimum current priority
        current_policy = cloud_armor.get_policy(self)
        current_rules = current_policy['rules']
        rule_priorities = []
        for rule in current_rules:
            rule_priorities.append(rule['priority'])
        priority = int(min(rule_priorities)) - 1

        body = {
            "description": "{}".format(self.domain),
            "priority": priority,
            "match": {
                "expr": {
                    "expression": "request.headers['referer']=="{}"".format(
                        self.domain)
                }
            },
            "action": "allow",
            "preview": False,
            "kind": "compute#securityPolicy"
        }
        try:
            policies = compute.securityPolicies()
            rule = policies.addRule(project=self.project_id,
                                    securityPolicy=self.policy_name,
                                    body=body
                                    ).execute()
            return rule
        except Exception as err:
            for i in range(0, len(err.args)):
                logging.error(err.args[i])
                pprint(err.args[i])
            print("==Policy Rule Failed to Add==")
            raise Exception("Policy Rule Failed to Add")

    def get_policy(self):
        try:
            policies = compute.securityPolicies()
            policy = policies.get(project=self.project_id,
                                  securityPolicy=self.policy_name
                                  ).execute()
            return policy
        except Exception as err:
            for i in range(err.args):
                logging.error(err.args[i])
                pprint(err.args[i])
            print("==Failed to Fetch Policy==")
            raise Exception("Failed to Fetch Policy")