如果任务 1 失败,如何在 运行 时间添加任务

How to add task at run time if task 1 is failed

如果任务 1 成功,我想执行任务 2 如果任务 1 失败,我想 运行 任务 3,如果需要,我想分配另一个流程。

基本上我想 运行 在没有 ssh 操作符的气流中执行条件任务。

from airflow import DAG
from airflow.operators import PythonOperator,BranchPythonOperator
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.models import Variable


def t2_error_task(context):
    instance = context['task_instance']
    if instance.task_id == "performExtract":
        print ("Please implement something over this")
        task_3 = PythonOperator(
            task_id='performJoin1',
            python_callable=performJoin1,  # maybe main?
            dag = dag
        )
        dag.add_task(task_3)
with DAG(
    'manageWorkFlow',
    catchup=False,
    default_args={
        'owner': 'Mannu',
    'start_date': datetime(2018, 4, 13),
        'schedule_interval':None,
        'depends_on_past': False,
    },
) as dag:
    task_1 = PythonOperator(
        task_id='performExtract',
        python_callable=performExtract,
        on_failure_callback=t2_error_task,
        depends_on_past=True
    )
    task_2 = PythonOperator(
        task_id='printSchemas',
        depends_on_past=True,
        python_callable=printSchemaAll,  # maybe main?
    )
    task_2.set_upstream(task_1)

A​​irflow 不支持基于执行时状态动态添加任务。为了获得所需的行为,您应该将 task_3 添加到您的 dag,但将其 trigger_rule 更改为 all_failed。在这种情况下,任务将在 task_1 成功时被标记为已跳过,但在失败时将被执行。