AIrflow 中的 task_instance.xcom_pull 是什么?

What is task_instance.xcom_pull in AIrflow?

我正在尝试 运行 通过 Airflow 进行 EMR,并找到了上面写着

的示例
 step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=SPARK_STEPS,
    )

    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
        aws_conn_id='aws_default',
    )

什么是 job_flow_id ={{ task_instance.xcom_pull('create_job_flow', key='return_value') }}

这告诉我什么?

谢谢, 习

在 Airflow 中,任务无法共享数据,但可以共享元数据。这是通过一个任务将记录写入数据库中的 Xcom table 而其他任务读取它来完成的。

task_instance.xcom_pull('create_job_flow', key='return_value')) 表示:

  1. 前往Xcom table
  2. 找到与此 DagRun 匹配的行并且 task_id='create_job_flow'
  3. return条目保存在key='return_value'

{{ }} 是 Jinja 引擎的语法,意思是“打印”值。这是必需的,因为您要查找的值仅在 运行 时间内存在。 create_job_flow 任务必须 运行 并将值保存到数据库中,然后 add_steps 任务才能读取该值。 实际上,这意味着 create_job_flow 任务正在创建 EMR 实例并将 instance/machine id 保存到 Xcom table。下一个任务是 add_steps,这意味着您要向机器提交步骤 - 为此您需要机器 ID,因此您必须从 Xcom table 读取(拉取)值。每个 DagRun 的值都会不同,因为每个 DagRun 都会创建一个新机器。