设置 Cloud Function 以在 Terraform 中触发 Cloud Composer DAG
Setup Cloud Function to trigger Cloud Composer DAG in Terraform
我有一个云作曲家环境:
resource "google_composer_environment" "default" {
name = "default"
region = "us-central1"
config {
node_count = 5
node_config {
zone = "us-central1-a"
machine_type = "n1-standard-2"
}
}
}
以及我用来触发该环境的云函数
resource "google_cloudfunctions_function" "trigger_dag" {
name = "trigger_dag"
runtime = "python37"
labels = {
"deployment-tool" = "terraform"
}
event_trigger {
event_type = "google.pubsub.topic.publish"
resource = "projects/${var.project_id}/topics/trigger_dag"
}
entry_point = "trigger_dag"
environment_variables = {
"AIRFLOW_URI" = google_composer_environment.default.config.0.airflow_uri
}
source_repository {
url = local.repo_url
}
timeouts {}
depends_on = [google_composer_environment.default]
}
但是,我还需要为云函数提供一个 IAP 客户端 ID,以便在调用 DAG 时使用:https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf#getting_the_client_id
在 Terraform 中,有没有办法让 运行 python 脚本在重新创建 DAG 时生成客户端 ID(因此网络服务器发生变化,旧 ID 无效),并将该 ID 设置为云功能的环境变量?
我们使用 external data source 到 运行 python 脚本和 return 客户端 ID 使其工作。
data "external" "composer_iap_client_id" {
program = ["python", "${path.module}/iap_client.py"]
query = {
airflow_uri = google_composer_environment.default.config.0.airflow_uri
}
}
iap_client.py
import json
import sys
import urllib.parse
import requests
def get_iap_client_id(airflow_uri: str) -> str:
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']
# Extract the client_id query parameter from the redirect.
parsed = urllib.parse.urlparse(redirect_location)
query_string = urllib.parse.parse_qs(parsed.query)
return query_string['client_id'][0]
def main() -> None:
json_input = json.load(sys.stdin)
client_id = get_iap_client_id(json_input['airflow_uri'])
json.dump({'iap_client_id': client_id}, sys.stdout)
if __name__ == '__main__':
main()
更新云函数以将 iap 客户端 ID 作为环境变量提取:
...
environment_variables = {
"AIRFLOW_URI" = google_composer_environment.default.config.0.airflow_uri
"IAP_CLIENT_ID" = data.external.composer_iap_client_id.result.iap_client_id
}
...
我有一个云作曲家环境:
resource "google_composer_environment" "default" {
name = "default"
region = "us-central1"
config {
node_count = 5
node_config {
zone = "us-central1-a"
machine_type = "n1-standard-2"
}
}
}
以及我用来触发该环境的云函数
resource "google_cloudfunctions_function" "trigger_dag" {
name = "trigger_dag"
runtime = "python37"
labels = {
"deployment-tool" = "terraform"
}
event_trigger {
event_type = "google.pubsub.topic.publish"
resource = "projects/${var.project_id}/topics/trigger_dag"
}
entry_point = "trigger_dag"
environment_variables = {
"AIRFLOW_URI" = google_composer_environment.default.config.0.airflow_uri
}
source_repository {
url = local.repo_url
}
timeouts {}
depends_on = [google_composer_environment.default]
}
但是,我还需要为云函数提供一个 IAP 客户端 ID,以便在调用 DAG 时使用:https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf#getting_the_client_id
在 Terraform 中,有没有办法让 运行 python 脚本在重新创建 DAG 时生成客户端 ID(因此网络服务器发生变化,旧 ID 无效),并将该 ID 设置为云功能的环境变量?
我们使用 external data source 到 运行 python 脚本和 return 客户端 ID 使其工作。
data "external" "composer_iap_client_id" {
program = ["python", "${path.module}/iap_client.py"]
query = {
airflow_uri = google_composer_environment.default.config.0.airflow_uri
}
}
iap_client.py
import json
import sys
import urllib.parse
import requests
def get_iap_client_id(airflow_uri: str) -> str:
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']
# Extract the client_id query parameter from the redirect.
parsed = urllib.parse.urlparse(redirect_location)
query_string = urllib.parse.parse_qs(parsed.query)
return query_string['client_id'][0]
def main() -> None:
json_input = json.load(sys.stdin)
client_id = get_iap_client_id(json_input['airflow_uri'])
json.dump({'iap_client_id': client_id}, sys.stdout)
if __name__ == '__main__':
main()
更新云函数以将 iap 客户端 ID 作为环境变量提取:
...
environment_variables = {
"AIRFLOW_URI" = google_composer_environment.default.config.0.airflow_uri
"IAP_CLIENT_ID" = data.external.composer_iap_client_id.result.iap_client_id
}
...