如何跳过 Airflow 上的任务?
How to skip tasks on Airflow?
我想了解 Airflow 是否支持跳过 DAG 中的任务以进行临时执行?
假设我的 DAG 图是这样的:
任务 1 > 任务 2 > 任务 3 > 任务 4
我想从任务 3 手动启动我的 DAG,最好的方法是什么?
我已阅读有关 ShortCircuitOperator
的内容,但我正在寻找更多临时解决方案,一旦触发执行即可应用。
谢谢!
您可以合并 SkipMixin that the ShortCircuitOperator uses under the hood 以跳过下游任务。
from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults
class mySkippingOperator(BaseOperator, SkipMixin)
@apply_defaults
def __init__(self,
condition,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.condition = condition
def execute(self, context):
if self.condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
self.log.info("Done.")
根据 Apache Airflow 的构建方式,您可以编写 logic/branches 来确定要 运行 的任务。
但是
您不能从中间的任何任务开始执行任务。顺序完全由依赖管理定义(upstream/downstrem)。
但是,如果您使用的是 celery 运算符,则可以忽略 运行 中的所有依赖项,并让 airflow 随意执行任务。话又说回来,这不会阻止上游的任务被调度。
马燕,
有一个非常肮脏但非常简单且最明显的解决方案。几乎30秒。但是,只有当您可以轻松地更新 PROD 中的代码并且能够临时阻止其他人访问 运行 DAG 时才有可能。
只评论你想跳过的任务
'#task1 > task2 >
任务 3 > 任务 4
一个更严肃但需要更多努力的解决方案可能是基于 start_from_task 的参数动态创建 DAG,在这种情况下,将使用此参数构建依赖项。可以使用 Admin==>Variables 菜单在 UI 中更改参数。您可能还可以使用前一个变量的导出时间的另一个变量。例如- DAG 将忽略 task1 和 task2,直到 14:05:30 之后 运行 整个 DAG。
是的,您只需单击任务 3。切换 运行 按钮右侧的复选框以忽略依赖项,然后单击 运行。
是的,您可以通过另一个临时基础来完成此操作。
不知何故找到了!!
您需要引发 AirflowSkipException
from airflow.exceptions import AirflowSkipException
def execute():
if condition:
raise AirflowSkipException
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
我想了解 Airflow 是否支持跳过 DAG 中的任务以进行临时执行?
假设我的 DAG 图是这样的: 任务 1 > 任务 2 > 任务 3 > 任务 4
我想从任务 3 手动启动我的 DAG,最好的方法是什么?
我已阅读有关 ShortCircuitOperator
的内容,但我正在寻找更多临时解决方案,一旦触发执行即可应用。
谢谢!
您可以合并 SkipMixin that the ShortCircuitOperator uses under the hood 以跳过下游任务。
from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults
class mySkippingOperator(BaseOperator, SkipMixin)
@apply_defaults
def __init__(self,
condition,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.condition = condition
def execute(self, context):
if self.condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
self.log.info("Done.")
根据 Apache Airflow 的构建方式,您可以编写 logic/branches 来确定要 运行 的任务。
但是
您不能从中间的任何任务开始执行任务。顺序完全由依赖管理定义(upstream/downstrem)。
但是,如果您使用的是 celery 运算符,则可以忽略 运行 中的所有依赖项,并让 airflow 随意执行任务。话又说回来,这不会阻止上游的任务被调度。
马燕, 有一个非常肮脏但非常简单且最明显的解决方案。几乎30秒。但是,只有当您可以轻松地更新 PROD 中的代码并且能够临时阻止其他人访问 运行 DAG 时才有可能。 只评论你想跳过的任务
'#task1 > task2 >
任务 3 > 任务 4
一个更严肃但需要更多努力的解决方案可能是基于 start_from_task 的参数动态创建 DAG,在这种情况下,将使用此参数构建依赖项。可以使用 Admin==>Variables 菜单在 UI 中更改参数。您可能还可以使用前一个变量的导出时间的另一个变量。例如- DAG 将忽略 task1 和 task2,直到 14:05:30 之后 运行 整个 DAG。
是的,您只需单击任务 3。切换 运行 按钮右侧的复选框以忽略依赖项,然后单击 运行。
是的,您可以通过另一个临时基础来完成此操作。 不知何故找到了!!
您需要引发 AirflowSkipException
from airflow.exceptions import AirflowSkipException
def execute():
if condition:
raise AirflowSkipException
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)