如何在上一个运行ning DAG结束后将airflow DAG调度到运行?
How to schedule the airflow DAG to run just after the end of the previous running DAG?
我有一个简单的 DAG
,有 2 个 PythonOperator
和 2 分钟的计划间隔:
with DAG(dag_id='example_cron', schedule_interval='*/2 * * * *', start_date=days_ago(2)) as dag:
def task1_func(ti):
print("start task 1")
time.sleep(random.randint(0, 70))
print("end task 1")
def task2_func(ti):
print("start task 2")
time.sleep(random.randint(0, 70))
print("end task 2")
task1 = PythonOperator(task_id='task1', python_callable=task1_func, provide_context=True)
task2 = PythonOperator(task_id='task2', python_callable=task2_func, provide_context=True)
task1 >> task2
-
DAG
可以 运行 超过 2 分钟,这意味着不止一个 DAG
可以 运行 并行。
如果前一个 运行 已经完成,我如何将 DAG 配置为 运行?
您只需将 max_active_runs=1
添加到您的 DAG 对象。
with DAG(..., max_active_runs=1) as dag:
不是您问题的一部分,但请注意 days_ago(2)
已弃用,无论如何您都不应该为 start_date
使用动态日期(参见 docs)
我有一个简单的 DAG
,有 2 个 PythonOperator
和 2 分钟的计划间隔:
with DAG(dag_id='example_cron', schedule_interval='*/2 * * * *', start_date=days_ago(2)) as dag:
def task1_func(ti):
print("start task 1")
time.sleep(random.randint(0, 70))
print("end task 1")
def task2_func(ti):
print("start task 2")
time.sleep(random.randint(0, 70))
print("end task 2")
task1 = PythonOperator(task_id='task1', python_callable=task1_func, provide_context=True)
task2 = PythonOperator(task_id='task2', python_callable=task2_func, provide_context=True)
task1 >> task2
-
DAG
可以 运行 超过 2 分钟,这意味着不止一个DAG
可以 运行 并行。
如果前一个 运行 已经完成,我如何将 DAG 配置为 运行?
您只需将 max_active_runs=1
添加到您的 DAG 对象。
with DAG(..., max_active_runs=1) as dag:
不是您问题的一部分,但请注意 days_ago(2)
已弃用,无论如何您都不应该为 start_date
使用动态日期(参见 docs)