气流计划未更新
Airflow schedule not updating
我创建了一个每周 运行 的 DAG。以下是我的尝试,它按预期工作。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
SCHEDULE_INTERVAL = timedelta(weeks=1, seconds=00, minutes=00, hours=00)
default_args = {
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=2),
'wait_for_downstream': True,
'provide_context': True,
'start_date': datetime(2020, 12, 20, hour=00, minute=00, second=00)
}
with DAG("DAG", default_args=default_args, schedule_interval=SCHEDULE_INTERVAL, catchup=True) as dag:
t1 = BashOperator(
task_id='dag_schedule',
bash_command='echo DAG',
dag=dag)
根据时间表,它 运行 在 27 日(即剧本中的 20 日)。由于要求发生变化,现在我将开始日期更新为 30 日(即脚本中的 23 日)而不是 27 日(我的想法是从 30 日开始安排,每周从那里开始)。当我更改 DAG 的时间表时,即将开始日期从 27 日更改为 30 日。 DAG 没有按照最晚的开始日期进行挑选,不确定为什么?当我删除 DAG(因为它是测试 DAG,我删除了它,在产品中我无法删除它)并创建了具有相同名称的新 DAG,最新开始日期即 30 日,它是 运行ning时间表。
您定义的 DAG 将在 6-Jan-2021
上触发
Airflow 在时间间隔结束时安排任务 (See doc reference)
所以根据您的设置:
SCHEDULE_INTERVAL = timedelta(weeks=1, seconds=00, minutes=00, hours=00)
和
'start_date': datetime(2020, 12 , 30, hour=00, minute=00, second=00)
这意味着第一个 运行 将出现在 6-Jan-2021
因为 30-Dec-2020
+ 1 week
= 6-Jan-2021
注意这个的 execution_date
运行 将是 2020-12-30
根据 Airflow DOC
When needing to change your start_date and schedule interval, change the name of the dag (a.k.a. dag_id) - I follow the convention : my_dag_v1, my_dag_v2, my_dag_v3, my_dag_v4, etc...
- Changing schedule interval always requires changing the dag_id, because previously run TaskInstances will not align with the new schedule interval
- Changing start_date without changing schedule_interval is safe, but changing to an earlier start_date will not create any new DagRuns for the time between the new start_date and the old one, so tasks will not automatically backfill to the new dates. If you manually create DagRuns, tasks will be scheduled, as long as the DagRun date is after both the task start_date and the dag start_date.
因此,如果我们更改开始日期,我们需要更改 DAG 名称或删除现有 DAG,以便再次使用相同名称重新创建它(与先前 DAG 相关的元数据将从元数据中删除)
我创建了一个每周 运行 的 DAG。以下是我的尝试,它按预期工作。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
SCHEDULE_INTERVAL = timedelta(weeks=1, seconds=00, minutes=00, hours=00)
default_args = {
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=2),
'wait_for_downstream': True,
'provide_context': True,
'start_date': datetime(2020, 12, 20, hour=00, minute=00, second=00)
}
with DAG("DAG", default_args=default_args, schedule_interval=SCHEDULE_INTERVAL, catchup=True) as dag:
t1 = BashOperator(
task_id='dag_schedule',
bash_command='echo DAG',
dag=dag)
根据时间表,它 运行 在 27 日(即剧本中的 20 日)。由于要求发生变化,现在我将开始日期更新为 30 日(即脚本中的 23 日)而不是 27 日(我的想法是从 30 日开始安排,每周从那里开始)。当我更改 DAG 的时间表时,即将开始日期从 27 日更改为 30 日。 DAG 没有按照最晚的开始日期进行挑选,不确定为什么?当我删除 DAG(因为它是测试 DAG,我删除了它,在产品中我无法删除它)并创建了具有相同名称的新 DAG,最新开始日期即 30 日,它是 运行ning时间表。
您定义的 DAG 将在 6-Jan-2021
Airflow 在时间间隔结束时安排任务 (See doc reference)
所以根据您的设置:
SCHEDULE_INTERVAL = timedelta(weeks=1, seconds=00, minutes=00, hours=00)
和
'start_date': datetime(2020, 12 , 30, hour=00, minute=00, second=00)
这意味着第一个 运行 将出现在 6-Jan-2021
因为 30-Dec-2020
+ 1 week
= 6-Jan-2021
注意这个的 execution_date
运行 将是 2020-12-30
根据 Airflow DOC
When needing to change your start_date and schedule interval, change the name of the dag (a.k.a. dag_id) - I follow the convention : my_dag_v1, my_dag_v2, my_dag_v3, my_dag_v4, etc...
- Changing schedule interval always requires changing the dag_id, because previously run TaskInstances will not align with the new schedule interval
- Changing start_date without changing schedule_interval is safe, but changing to an earlier start_date will not create any new DagRuns for the time between the new start_date and the old one, so tasks will not automatically backfill to the new dates. If you manually create DagRuns, tasks will be scheduled, as long as the DagRun date is after both the task start_date and the dag start_date.
因此,如果我们更改开始日期,我们需要更改 DAG 名称或删除现有 DAG,以便再次使用相同名称重新创建它(与先前 DAG 相关的元数据将从元数据中删除)