如何处理气流中单个 Dag 的不同任务间隔?
How to handle different task intervals on a single Dag in airflow?
我有一个包含多个任务的单一 dag,具有这种简单的结构,任务 A、B 和 C 可以 运行 在开始时没有任何依赖关系,但任务 D 依赖于 A 不,这是我的问题:
任务 A、B 和 C 运行 每天,但我需要在 A 成功后每周 运行 任务 D。我该如何设置这个 dag?
更改 schedule_interval 任务有效吗?这个问题有什么最佳实践吗?
感谢您的帮助。
您可以使用 ShortCircuitOperator 来完成此操作。
import airflow
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'schedule_interval': '0 10 * * *'
}
dag = DAG(dag_id='example', default_args=args)
a = DummyOperator(task_id='a', dag=dag)
b = DummyOperator(task_id='b', dag=dag)
c = DummyOperator(task_id='c', dag=dag)
d = DummyOperator(task_id='d', dag=dag)
def check_trigger(execution_date, **kwargs):
return execution_date.weekday() == 0
check_trigger_d = ShortCircuitOperator(
task_id='check_trigger_d',
python_callable=check_trigger,
provide_context=True,
dag=dag
)
a.set_downstream(b)
b.set_downstream(c)
a.set_downstream(check_trigger_d)
# Perform D only if trigger function returns a true value
check_trigger_d.set_downstream(d)
在 Airflow 版本 >= 2.1.0 中,您可以使用完全适合您的情况的 BranchDayOfWeekOperator。
查看此answer了解更多详情。
我有一个包含多个任务的单一 dag,具有这种简单的结构,任务 A、B 和 C 可以 运行 在开始时没有任何依赖关系,但任务 D 依赖于 A 不,这是我的问题:
任务 A、B 和 C 运行 每天,但我需要在 A 成功后每周 运行 任务 D。我该如何设置这个 dag?
更改 schedule_interval 任务有效吗?这个问题有什么最佳实践吗?
感谢您的帮助。
您可以使用 ShortCircuitOperator 来完成此操作。
import airflow
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'schedule_interval': '0 10 * * *'
}
dag = DAG(dag_id='example', default_args=args)
a = DummyOperator(task_id='a', dag=dag)
b = DummyOperator(task_id='b', dag=dag)
c = DummyOperator(task_id='c', dag=dag)
d = DummyOperator(task_id='d', dag=dag)
def check_trigger(execution_date, **kwargs):
return execution_date.weekday() == 0
check_trigger_d = ShortCircuitOperator(
task_id='check_trigger_d',
python_callable=check_trigger,
provide_context=True,
dag=dag
)
a.set_downstream(b)
b.set_downstream(c)
a.set_downstream(check_trigger_d)
# Perform D only if trigger function returns a true value
check_trigger_d.set_downstream(d)
在 Airflow 版本 >= 2.1.0 中,您可以使用完全适合您的情况的 BranchDayOfWeekOperator。
查看此answer了解更多详情。