我们可以将 x_com 变量作为参数传递给 DAG 中的下一个任务吗?
Can we pass x_com variable to the next task in DAG as a parameter?
请考虑这种情况:
我在 cloud composer 上有一个 cloud Dag,可以触发一个 cloud 函数。该函数命中 api,然后将 table 存储在 GCS 中。现在我的 Airflow DAG(使用 Cloud Composer)触发下一阶段,即从 GCS 获取 table 并推送到 BQ 的 Dataproc 作业,但是当我触发我的 Dataproc 工作流模板时,我传递的参数是table 来自 dag 本身以及我想从 x_com 中选择的参数。
这是一个代码片段,它抛出一个未定义的错误
dataproc_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_job",
# The template id of your workflow
template_id="newwf1",
project_id='#######',
region="us-central1",
parameters={"TABLE_NAME":ti.xcom_pull(task_ids=simple_http}
)
如何解决此错误并将 x_com 值作为参数传递给我在 DAG 中的下一步?
假设 parameters
参数可以模板化(在 DataprocWorkflowTemplateInstantiateOperator
中也被列为 templated_field
),您可以使用 Jinja 表达式来访问 XCom
值。
DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_job",
# The template id of your workflow
template_id="newwf1",
project_id='#######',
region="us-central1",
parameters={"TABLE_NAME": "{{ ti.xcom_pull(task_ids='simple_http' }}"}
)
更多关于 Airflow 中的 Jinja 模板 here as well as using XComs
with templates mentioned in this doc。
顺便说一句,该操作员看起来像一个非常古老的 Airflow 1 操作员。如果可以,我强烈建议升级到 Airflow 2。有无数的功能和性能改进,使您的 Airflow 和管道执行体验更好。
请考虑这种情况:
我在 cloud composer 上有一个 cloud Dag,可以触发一个 cloud 函数。该函数命中 api,然后将 table 存储在 GCS 中。现在我的 Airflow DAG(使用 Cloud Composer)触发下一阶段,即从 GCS 获取 table 并推送到 BQ 的 Dataproc 作业,但是当我触发我的 Dataproc 工作流模板时,我传递的参数是table 来自 dag 本身以及我想从 x_com 中选择的参数。
这是一个代码片段,它抛出一个未定义的错误
dataproc_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_job",
# The template id of your workflow
template_id="newwf1",
project_id='#######',
region="us-central1",
parameters={"TABLE_NAME":ti.xcom_pull(task_ids=simple_http}
)
如何解决此错误并将 x_com 值作为参数传递给我在 DAG 中的下一步?
假设 parameters
参数可以模板化(在 DataprocWorkflowTemplateInstantiateOperator
中也被列为 templated_field
),您可以使用 Jinja 表达式来访问 XCom
值。
DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_job",
# The template id of your workflow
template_id="newwf1",
project_id='#######',
region="us-central1",
parameters={"TABLE_NAME": "{{ ti.xcom_pull(task_ids='simple_http' }}"}
)
更多关于 Airflow 中的 Jinja 模板 here as well as using XComs
with templates mentioned in this doc。
顺便说一句,该操作员看起来像一个非常古老的 Airflow 1 操作员。如果可以,我强烈建议升级到 Airflow 2。有无数的功能和性能改进,使您的 Airflow 和管道执行体验更好。