我们可以将 x_com 变量作为参数传递给 DAG 中的下一个任务吗?

Can we pass x_com variable to the next task in DAG as a parameter?

请考虑这种情况:

我在 cloud composer 上有一个 cloud Dag,可以触发一个 cloud 函数。该函数命中 api,然后将 table 存储在 GCS 中。现在我的 Airflow DAG(使用 Cloud Composer)触发下一阶段,即从 GCS 获取 table 并推送到 BQ 的 Dataproc 作业,但是当我触发我的 Dataproc 工作流模板时,我传递的参数是table 来自 dag 本身以及我想从 x_com 中选择的参数。

这是一个代码片段,它抛出一个未定义的错误

dataproc_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
# The task id of your job
task_id="dataproc_job",
# The template id of your workflow
template_id="newwf1",
project_id='#######',
region="us-central1",
parameters={"TABLE_NAME":ti.xcom_pull(task_ids=simple_http}
)

如何解决此错误并将 x_com 值作为参数传递给我在 DAG 中的下一步?

假设 parameters 参数可以模板化(在 DataprocWorkflowTemplateInstantiateOperator 中也被列为 templated_field),您可以使用 Jinja 表达式来访问 XCom 值。

DataprocWorkflowTemplateInstantiateOperator(
    # The task id of your job
    task_id="dataproc_job",
    # The template id of your workflow
    template_id="newwf1",
    project_id='#######',
    region="us-central1", 
    parameters={"TABLE_NAME": "{{ ti.xcom_pull(task_ids='simple_http' }}"} 
)

更多关于 Airflow 中的 Jinja 模板 here as well as using XComs with templates mentioned in this doc

顺便说一句,该操作员看起来像一个非常古老的 Airflow 1 操作员。如果可以,我强烈建议升级到 Airflow 2。有无数的功能和性能改进,使您的 Airflow 和管道执行体验更好。