如何跳过 Airflow 上的任务?

How to skip tasks on Airflow?

我想了解 Airflow 是否支持跳过 DAG 中的任务以进行临时执行?

假设我的 DAG 图是这样的: 任务 1 > 任务 2 > 任务 3 > 任务 4

我想从任务 3 手动启动我的 DAG,最好的方法是什么?

我已阅读有关 ShortCircuitOperator 的内容,但我正在寻找更多临时解决方案,一旦触发执行即可应用。

谢谢!

您可以合并 SkipMixin that the ShortCircuitOperator uses under the hood 以跳过下游任务。

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)
    
    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition
    
    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
       
        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")

根据 Apache Airflow 的构建方式,您可以编写 logic/branches 来确定要 运行 的任务。

但是

您不能从中间的任何任务开始执行任务。顺序完全由依赖管理定义(upstream/downstrem)。

但是,如果您使用的是 celery 运算符,则可以忽略 运行 中的所有依赖项,并让 airflow 随意执行任务。话又说回来,这不会阻止上游的任务被调度。

马燕, 有一个非常肮脏但非常简单且最明显的解决方案。几乎30秒。但是,只有当您可以轻松地更新 PROD 中的代码并且能够临时阻止其他人访问 运行 DAG 时才有可能。 只评论你想跳过的任务

'#task1 > task2 >

任务 3 > 任务 4

一个更严肃但需要更多努力的解决方案可能是基于 start_from_task 的参数动态创建 DAG,在这种情况下,将使用此参数构建依赖项。可以使用 Admin==>Variables 菜单在 UI 中更改参数。您可能还可以使用前一个变量的导出时间的另一个变量。例如- DAG 将忽略 task1 和 task2,直到 14:05:30 之后 运行 整个 DAG。

是的,您只需单击任务 3。切换 运行 按钮右侧的复选框以忽略依赖项,然后单击 运行。

是的,您可以通过另一个临时基础来完成此操作。 不知何故找到了!!

您需要引发 AirflowSkipException

from airflow.exceptions import AirflowSkipException
def execute():
    if condition:
        raise AirflowSkipException

task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)