设置 Cloud Function 以在 Terraform 中触发 Cloud Composer DAG

Setup Cloud Function to trigger Cloud Composer DAG in Terraform

我有一个云作曲家环境:

resource "google_composer_environment" "default" {
  name   = "default"
  region = "us-central1"
  config {
    node_count = 5

    node_config {
      zone         = "us-central1-a"
      machine_type = "n1-standard-2"
    }
  }
}

以及我用来触发该环境的云函数

resource "google_cloudfunctions_function" "trigger_dag" {
  name    = "trigger_dag"
  runtime = "python37"
  labels = {
    "deployment-tool" = "terraform"
  }

  event_trigger {
    event_type = "google.pubsub.topic.publish"
    resource = "projects/${var.project_id}/topics/trigger_dag"
  }

  entry_point = "trigger_dag"
  environment_variables = {
    "AIRFLOW_URI" = google_composer_environment.default.config.0.airflow_uri
  }

  source_repository {
    url = local.repo_url
  }

  timeouts {}

  depends_on = [google_composer_environment.default]
}

但是,我还需要为云函数提供一个 IAP 客户端 ID,以便在调用 DAG 时使用:https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf#getting_the_client_id

在 Terraform 中,有没有办法让 运行 python 脚本在重新创建 DAG 时生成客户端 ID(因此网络服务器发生变化,旧 ID 无效),并将该 ID 设置为云功能的环境变量?

我们使用 external data source 到 运行 python 脚本和 return 客户端 ID 使其工作。

data "external" "composer_iap_client_id" {
  program = ["python", "${path.module}/iap_client.py"]

  query = {
    airflow_uri = google_composer_environment.default.config.0.airflow_uri
  }
}

iap_client.py

import json
import sys
import urllib.parse

import requests


def get_iap_client_id(airflow_uri: str) -> str:
    redirect_response = requests.get(airflow_uri, allow_redirects=False)
    redirect_location = redirect_response.headers['location']

    # Extract the client_id query parameter from the redirect.
    parsed = urllib.parse.urlparse(redirect_location)
    query_string = urllib.parse.parse_qs(parsed.query)
    return query_string['client_id'][0]


def main() -> None:
    json_input = json.load(sys.stdin)
    client_id = get_iap_client_id(json_input['airflow_uri'])
    json.dump({'iap_client_id': client_id}, sys.stdout)


if __name__ == '__main__':
    main()

更新云函数以将 iap 客户端 ID 作为环境变量提取:

  ...
  environment_variables = {
    "AIRFLOW_URI" = google_composer_environment.default.config.0.airflow_uri
    "IAP_CLIENT_ID" = data.external.composer_iap_client_id.result.iap_client_id
  }
  ...