如何将 Cloud 运行 容器执行到 Airflow DAG 中?
How to execute Cloud Run containers into an Airflow DAG?
我正在尝试 运行 一个带有 Cloud 运行 的容器作为 Airflow 的 DAG 的任务。
似乎没有 Cloud运行Operator 或类似的东西,我在文档中找不到任何东西(Cloud 运行 和 Airflow 之一)。
有人处理过这个问题吗?
如果是,我如何 运行 一个带有 Cloud 运行 的容器并处理 xcom?
提前致谢!!
据我所知,当容器部署到云端时 运行 它会自动侦听可能要发送的请求。请参阅 document 以供参考。
相反,您可以发送请求以访问已部署的容器。您可以使用以下代码执行此操作。
这个 DAG 有三个任务 print_token
、task_get_op
和 process_data
。
print_token
打印对已部署的 Cloud 运行 容器的请求进行身份验证所需的身份令牌。我使用“xcom_pull”获取“BashOperator”的输出并将身份验证令牌分配给 token
,因此这可用于对您将执行的 HTTP 请求进行身份验证。
task_get_op
在连接 cloud_run
上执行 GET(这仅包含云 运行 端点)并定义 headers 'Authorization': 'Bearer ' + token
用于身份验证。
process_data
在“task_get_op”上执行“xcom_pull”以获取输出并使用 PythonOperator 打印它。
import datetime
import airflow
from airflow.operators import bash
from airflow.operators import python
from airflow.providers.http.operators.http import SimpleHttpOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'composer_http_request',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
print_token = bash.BashOperator(
task_id='print_token',
bash_command='gcloud auth print-identity-token "--audiences=https://hello-world-fri824-ab.c.run.app"' # The end point of the deployed Cloud Run container
)
token = "{{ task_instance.xcom_pull(task_ids='print_token') }}" # gets output from 'print_token' task
task_get_op = SimpleHttpOperator(
task_id='get_op',
method='GET',
http_conn_id='cloud_run',
headers={'Authorization': 'Bearer ' + token },
)
def process_data_from_http(**kwargs):
ti = kwargs['ti']
http_data = ti.xcom_pull(task_ids='get_op')
print(http_data)
process_data = python.PythonOperator(
task_id='process_data_from_http',
python_callable=process_data_from_http,
provide_context=True
)
print_token >> task_get_op >> process_data
cloud_run
连接:
输出(图形):
print_token 日志:
task_get_op 日志:
注意:我正在使用 Cloud Composer 1.17.7 和 Airflow 2.0.2 并安装了 apache-airflow-providers-http
以便能够使用 SimpleHttpOperator.
我正在尝试 运行 一个带有 Cloud 运行 的容器作为 Airflow 的 DAG 的任务。
似乎没有 Cloud运行Operator 或类似的东西,我在文档中找不到任何东西(Cloud 运行 和 Airflow 之一)。
有人处理过这个问题吗? 如果是,我如何 运行 一个带有 Cloud 运行 的容器并处理 xcom?
提前致谢!!
据我所知,当容器部署到云端时 运行 它会自动侦听可能要发送的请求。请参阅 document 以供参考。
相反,您可以发送请求以访问已部署的容器。您可以使用以下代码执行此操作。
这个 DAG 有三个任务 print_token
、task_get_op
和 process_data
。
print_token
打印对已部署的 Cloud 运行 容器的请求进行身份验证所需的身份令牌。我使用“xcom_pull”获取“BashOperator”的输出并将身份验证令牌分配给token
,因此这可用于对您将执行的 HTTP 请求进行身份验证。task_get_op
在连接cloud_run
上执行 GET(这仅包含云 运行 端点)并定义 headers'Authorization': 'Bearer ' + token
用于身份验证。process_data
在“task_get_op”上执行“xcom_pull”以获取输出并使用 PythonOperator 打印它。
import datetime
import airflow
from airflow.operators import bash
from airflow.operators import python
from airflow.providers.http.operators.http import SimpleHttpOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with airflow.DAG(
'composer_http_request',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
print_token = bash.BashOperator(
task_id='print_token',
bash_command='gcloud auth print-identity-token "--audiences=https://hello-world-fri824-ab.c.run.app"' # The end point of the deployed Cloud Run container
)
token = "{{ task_instance.xcom_pull(task_ids='print_token') }}" # gets output from 'print_token' task
task_get_op = SimpleHttpOperator(
task_id='get_op',
method='GET',
http_conn_id='cloud_run',
headers={'Authorization': 'Bearer ' + token },
)
def process_data_from_http(**kwargs):
ti = kwargs['ti']
http_data = ti.xcom_pull(task_ids='get_op')
print(http_data)
process_data = python.PythonOperator(
task_id='process_data_from_http',
python_callable=process_data_from_http,
provide_context=True
)
print_token >> task_get_op >> process_data
cloud_run
连接:
输出(图形):
print_token 日志:
task_get_op 日志:
注意:我正在使用 Cloud Composer 1.17.7 和 Airflow 2.0.2 并安装了 apache-airflow-providers-http
以便能够使用 SimpleHttpOperator.