使用 Cloud Composer 上的 KubernetesPodOperator 通过 Cloud Functions 将变量传递到容器
Passing variables through Cloud Functions to a container using KubernetesPodOperator on Cloud Composer
我正在尝试从 Google Cloud Functions 上的后台函数 运行 获取事件和上下文变量数据,并将值传递给 运行 宁 KubernetesPodOperator 在 Cloud Composer/Airflow 上。
代码的第一部分是我的云函数,它触发了一个名为 gcs_to_pubsub_topic_dag 的 dag,我想传递和访问的是 json 中的数据,特别是 “conf”:事件数据。
#!/usr/bin/env python
# coding: utf-8
from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests
IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
def trigger_dag(event, context=None):
client_id = '###############.apps.googleusercontent.com'
webserver_id = '###############'
# The name of the DAG you wish to trigger
dag_name = 'gcs_to_pubsub_topic_dag'
webserver_url = (
'https://'
+ webserver_id
+ '.appspot.com/api/experimental/dags/'
+ dag_name
+ '/dag_runs'
)
print(f' This is my webserver url: {webserver_url}')
# Make a POST request to IAP which then Triggers the DAG
make_iap_request(
webserver_url, client_id, method='POST', json={"conf": event, "replace_microseconds": 'false'})
def make_iap_request(url, client_id, method='GET', **kwargs):
if 'timeout' not in kwargs:
kwargs['timeout'] = 90
google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)
resp = requests.request(
method, url,
headers={'Authorization': 'Bearer {}'.format(
google_open_id_connect_token)}, **kwargs)
if resp.status_code == 403:
raise Exception('Service account does not have permission to '
'access the IAP-protected application.')
elif resp.status_code != 200:
raise Exception(
'Bad response from application: {!r} / {!r} / {!r}'.format(
resp.status_code, resp.headers, resp.text))
else:
return resp.text
def main(event, context=None):
"""
Call the main function, sets the order in which to run functions.
"""
trigger_dag(event, context=None)
return 'Script has run without errors !!'
if (__name__ == "__main__"):
main()
被触发的 dag 运行s 这个 KubernetesPodOperator 代码:
kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id=TASK_ID,
# Name of task you want to run, used to generate Pod ID.
name=TASK_ID,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=[f'python3', 'execution_file.py'],
# The namespace to run within Kubernetes, default namespace is `default`.
namespace=KUBERNETES_NAMESPACE,
# location of the docker image on google container repository
image=f'eu.gcr.io/{GCP_PROJECT_ID}/{CONTAINER_ID}:{IMAGE_VERSION}',
#Always pulls the image before running it.
image_pull_policy='Always',
# The env_var template variable allows you to access variables defined in Airflow UI.
env_vars = {'GCP_PROJECT_ID':GCP_PROJECT_ID,'DAG_CONF':{{ dag_run.conf }}},
dag=dag)
最后我想让 DAG_CONF 在调用的容器图像 execution_file.py 脚本中打印 :
#!/usr/bin/env python
# coding: utf-8
from gcs_unzip_function import main as gcs_unzip_function
from gcs_to_pubsub_topic import main as gcs_to_pubsub_topic
from os import listdir, getenv
GCP_PROJECT_ID = getenv('GCP_PROJECT_ID')
DAG_CONF = getenv('DAG_CONF')
print('Test run')
print(GCP_PROJECT_ID)
print (f'This is my dag conf {DAG_CONF}')
print(type(DAG_CONF))
目前代码触发 dag 和 returns:
Test run
GCP_PROJECT_ID (this is set in the airflow environment variables)
This is my dag conf None
class 'NoneType
我希望 DAG_CONF 通过的地方
我有一种解决方法来访问有关在容器内触发 dag 的对象的数据 运行 KubernetesPodOperator。
post request code 保持不变,但我想强调的是,您可以将任何内容传递给字典中的 conf 元素。
make_iap_request(
webserver_url, client_id, method='POST', json={"conf": event,
"replace_microseconds": 'false'})
dag 代码要求您创建自定义 class 来评估 dag_run 和 .conf 元素,然后参数访问我们从发布请求发送的 json。
article边做边读。
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
class CustomKubernetesPodOperator(KubernetesPodOperator):
def execute(self, context):
json = str(context['dag_run'].conf)
arguments = [f'--json={json}']
self.arguments.extend(arguments)
super().execute(context)
CustomKubernetesPodOperator(
# The ID specified for the task.
task_id=TASK_ID,
# Name of task you want to run, used to generate Pod ID.
name=TASK_ID,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=[f'python3', 'execution_file.py'],
# The namespace to run within Kubernetes, default namespace is `default`.
namespace=KUBERNETES_NAMESPACE,
# location of the docker image on google container repository
image=f'eu.gcr.io/{GCP_PROJECT_ID}/{CONTAINER_ID}:{IMAGE_VERSION}',
#Always pulls the image before running it.
image_pull_policy='Always',
# The env_var template variable allows you to access variables defined in Airflow UI.
env_vars = {'GCP_PROJECT_ID':GCP_PROJECT_ID},
dag=dag)
容器中 运行 的代码使用 argparse 将参数作为字符串获取,然后使用 ast 文字将其改回要在代码中访问的字典:
import ast
import argparse
from os import listdir, getenv
def main(object_metadata_dict):
"""
Call the main function, sets the order in which to run functions.
"""
print(f'This is my metadata as a dictionary {object_metadata_dict}')
print (f'This is my bucket {object_metadata_dict["bucket"]}')
print (f'This is my file name {object_metadata_dict["name"]}')
return 'Script has run without errors !!'
if (__name__ == "__main__"):
parser = argparse.ArgumentParser(description='Staging to live load process.')
parser.add_argument("--json",type=str, dest="json", required = False, default = 'all',\
help="List of metadata for the triggered object derived
from cloud function backgroud functions.")
args = parser.parse_args()
json=args.json
object_metadata_dict=ast.literal_eval(json)
main(object_metadata_dict)
我正在尝试从 Google Cloud Functions 上的后台函数 运行 获取事件和上下文变量数据,并将值传递给 运行 宁 KubernetesPodOperator 在 Cloud Composer/Airflow 上。
代码的第一部分是我的云函数,它触发了一个名为 gcs_to_pubsub_topic_dag 的 dag,我想传递和访问的是 json 中的数据,特别是 “conf”:事件数据。
#!/usr/bin/env python
# coding: utf-8
from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests
IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
def trigger_dag(event, context=None):
client_id = '###############.apps.googleusercontent.com'
webserver_id = '###############'
# The name of the DAG you wish to trigger
dag_name = 'gcs_to_pubsub_topic_dag'
webserver_url = (
'https://'
+ webserver_id
+ '.appspot.com/api/experimental/dags/'
+ dag_name
+ '/dag_runs'
)
print(f' This is my webserver url: {webserver_url}')
# Make a POST request to IAP which then Triggers the DAG
make_iap_request(
webserver_url, client_id, method='POST', json={"conf": event, "replace_microseconds": 'false'})
def make_iap_request(url, client_id, method='GET', **kwargs):
if 'timeout' not in kwargs:
kwargs['timeout'] = 90
google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)
resp = requests.request(
method, url,
headers={'Authorization': 'Bearer {}'.format(
google_open_id_connect_token)}, **kwargs)
if resp.status_code == 403:
raise Exception('Service account does not have permission to '
'access the IAP-protected application.')
elif resp.status_code != 200:
raise Exception(
'Bad response from application: {!r} / {!r} / {!r}'.format(
resp.status_code, resp.headers, resp.text))
else:
return resp.text
def main(event, context=None):
"""
Call the main function, sets the order in which to run functions.
"""
trigger_dag(event, context=None)
return 'Script has run without errors !!'
if (__name__ == "__main__"):
main()
被触发的 dag 运行s 这个 KubernetesPodOperator 代码:
kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id=TASK_ID,
# Name of task you want to run, used to generate Pod ID.
name=TASK_ID,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=[f'python3', 'execution_file.py'],
# The namespace to run within Kubernetes, default namespace is `default`.
namespace=KUBERNETES_NAMESPACE,
# location of the docker image on google container repository
image=f'eu.gcr.io/{GCP_PROJECT_ID}/{CONTAINER_ID}:{IMAGE_VERSION}',
#Always pulls the image before running it.
image_pull_policy='Always',
# The env_var template variable allows you to access variables defined in Airflow UI.
env_vars = {'GCP_PROJECT_ID':GCP_PROJECT_ID,'DAG_CONF':{{ dag_run.conf }}},
dag=dag)
最后我想让 DAG_CONF 在调用的容器图像 execution_file.py 脚本中打印 :
#!/usr/bin/env python
# coding: utf-8
from gcs_unzip_function import main as gcs_unzip_function
from gcs_to_pubsub_topic import main as gcs_to_pubsub_topic
from os import listdir, getenv
GCP_PROJECT_ID = getenv('GCP_PROJECT_ID')
DAG_CONF = getenv('DAG_CONF')
print('Test run')
print(GCP_PROJECT_ID)
print (f'This is my dag conf {DAG_CONF}')
print(type(DAG_CONF))
目前代码触发 dag 和 returns:
Test run
GCP_PROJECT_ID (this is set in the airflow environment variables)
This is my dag conf None
class 'NoneType
我希望 DAG_CONF 通过的地方
我有一种解决方法来访问有关在容器内触发 dag 的对象的数据 运行 KubernetesPodOperator。
post request code 保持不变,但我想强调的是,您可以将任何内容传递给字典中的 conf 元素。
make_iap_request(
webserver_url, client_id, method='POST', json={"conf": event,
"replace_microseconds": 'false'})
dag 代码要求您创建自定义 class 来评估 dag_run 和 .conf 元素,然后参数访问我们从发布请求发送的 json。 article边做边读。
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
class CustomKubernetesPodOperator(KubernetesPodOperator):
def execute(self, context):
json = str(context['dag_run'].conf)
arguments = [f'--json={json}']
self.arguments.extend(arguments)
super().execute(context)
CustomKubernetesPodOperator(
# The ID specified for the task.
task_id=TASK_ID,
# Name of task you want to run, used to generate Pod ID.
name=TASK_ID,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=[f'python3', 'execution_file.py'],
# The namespace to run within Kubernetes, default namespace is `default`.
namespace=KUBERNETES_NAMESPACE,
# location of the docker image on google container repository
image=f'eu.gcr.io/{GCP_PROJECT_ID}/{CONTAINER_ID}:{IMAGE_VERSION}',
#Always pulls the image before running it.
image_pull_policy='Always',
# The env_var template variable allows you to access variables defined in Airflow UI.
env_vars = {'GCP_PROJECT_ID':GCP_PROJECT_ID},
dag=dag)
容器中 运行 的代码使用 argparse 将参数作为字符串获取,然后使用 ast 文字将其改回要在代码中访问的字典:
import ast
import argparse
from os import listdir, getenv
def main(object_metadata_dict):
"""
Call the main function, sets the order in which to run functions.
"""
print(f'This is my metadata as a dictionary {object_metadata_dict}')
print (f'This is my bucket {object_metadata_dict["bucket"]}')
print (f'This is my file name {object_metadata_dict["name"]}')
return 'Script has run without errors !!'
if (__name__ == "__main__"):
parser = argparse.ArgumentParser(description='Staging to live load process.')
parser.add_argument("--json",type=str, dest="json", required = False, default = 'all',\
help="List of metadata for the triggered object derived
from cloud function backgroud functions.")
args = parser.parse_args()
json=args.json
object_metadata_dict=ast.literal_eval(json)
main(object_metadata_dict)