AIrflow 中的 task_instance.xcom_pull 是什么?
What is task_instance.xcom_pull in AIrflow?
我正在尝试 运行 通过 Airflow 进行 EMR,并找到了上面写着
的示例
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
什么是 job_flow_id ={{ task_instance.xcom_pull('create_job_flow', key='return_value') }}
这告诉我什么?
谢谢,
习
在 Airflow 中,任务无法共享数据,但可以共享元数据。这是通过一个任务将记录写入数据库中的 Xcom table 而其他任务读取它来完成的。
task_instance.xcom_pull('create_job_flow', key='return_value'))
表示:
- 前往Xcom table
- 找到与此 DagRun 匹配的行并且
task_id='create_job_flow'
- return条目保存在
key='return_value'
下
{{ }}
是 Jinja 引擎的语法,意思是“打印”值。这是必需的,因为您要查找的值仅在 运行 时间内存在。 create_job_flow
任务必须 运行 并将值保存到数据库中,然后 add_steps
任务才能读取该值。
实际上,这意味着 create_job_flow
任务正在创建 EMR 实例并将 instance/machine id 保存到 Xcom table。下一个任务是 add_steps
,这意味着您要向机器提交步骤 - 为此您需要机器 ID,因此您必须从 Xcom table 读取(拉取)值。每个 DagRun 的值都会不同,因为每个 DagRun 都会创建一个新机器。
我正在尝试 运行 通过 Airflow 进行 EMR,并找到了上面写着
的示例 step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
什么是 job_flow_id ={{ task_instance.xcom_pull('create_job_flow', key='return_value') }}
这告诉我什么?
谢谢, 习
在 Airflow 中,任务无法共享数据,但可以共享元数据。这是通过一个任务将记录写入数据库中的 Xcom table 而其他任务读取它来完成的。
task_instance.xcom_pull('create_job_flow', key='return_value'))
表示:
- 前往Xcom table
- 找到与此 DagRun 匹配的行并且
task_id='create_job_flow'
- return条目保存在
key='return_value'
下
{{ }}
是 Jinja 引擎的语法,意思是“打印”值。这是必需的,因为您要查找的值仅在 运行 时间内存在。 create_job_flow
任务必须 运行 并将值保存到数据库中,然后 add_steps
任务才能读取该值。
实际上,这意味着 create_job_flow
任务正在创建 EMR 实例并将 instance/machine id 保存到 Xcom table。下一个任务是 add_steps
,这意味着您要向机器提交步骤 - 为此您需要机器 ID,因此您必须从 Xcom table 读取(拉取)值。每个 DagRun 的值都会不同,因为每个 DagRun 都会创建一个新机器。