如何跳过 Airflow 操作员中的任务?

How to skip task in Airflow operator?

有没有办法让 Airflow 从 PythonOperator 跳过当前任务?例如:

def execute():
    if condition:
        skip_current_task()

task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)

并且还在 Airflow 中将任务标记为“已跳过”UI?

跳过任务的最简单解决方案:

def execute():
    if condition:
        return

task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)

不幸的是,它会将任务标记为 DONE

想通了!跳过任务很简单:

def execute():
    if condition:
        raise AirflowSkipException

task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)