Apache Airflow 中调度程序间隔和启动时间的工作不正确
Incorrect work of scheduler interval and start time in Apache Airflow
找不到任务开始时间的解决方案。我有代码,找不到我错的地方。
当我有 运行 DAG 时,25.03、26.03、27.03。任务已完成,但今天 (28.03) 任务未在 6:48 开始。
我试过用cron表达式,pendulum,datetime,结果都是一样的。本地时间(UTC+3)和气流的时间(UTC)是不同的。我尝试在 'start date' 或 'schedule interval' 中每次(本地,气流)使用 - 没有结果。
使用:Ubuntu、Airflow v.1.9.0 和本地执行器。
emailname = Variable.get('test_mail')
l_start_date = datetime(2018, 3, 25, 6, 48)
l_schedule_interval = '@daily'
WORKFLOW_DEFAULT_ARGS = {
'owner': 'owner',
'depends_on_past': True,
'start_date': l_start_date,
'email': emailname,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retries_delay': timedelta(minutes=1),
}
# initialize the DAG
dag = DAG(
dag_id='test_dag_mail',
default_args=WORKFLOW_DEFAULT_ARGS,
schedule_interval=l_schedule_interval,
start_date=l_start_date,
)
这是 Airflow 的一个特性:
Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.
Let’s Repeat That: The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.
scheduler page 没有很好地解释 execution_date
设置为前一个 运行 和当前 运行 之间的时间段的开始。为什么?好吧,假设每月、每天、每小时的作业需要读取从那个时间到当前间隔时间的所有数据。是的,他们本可以改变它。
示例(将 nano/millis 秒缩短为一位数):
DAG start_date DAG schedule_interval Task Started Task execution_date
2017-12-01 '@daily' 2017-12-02 00:00:02.8 2017-12-01 00:00:00.0
2017-12-03 00:00:01.4 2017-12-02 00:00:00.0
2017-12-01 '@weekly' 2017-12-08 00:00:01.5 2017-12-01 00:00:00.0
2017-12-15 00:00:03.9 2017-12-08 00:00:00.0
2017-12-01 '33 03 * * *' 2017-12-02 03:33:01.6 2017-12-01 03:33:00.0
2017-12-03 03:33:02.2 2017-12-02 03:33:00.0
2017-12-01 '33 03 * * 2' 2017-12-12 03:33:01.7 2017-12-05 03:33:00.0
2017-12-19 03:33:03.1 2017-12-12 03:33:00.0
对于给出的最后一个案例,请注意开始日期不是间隔中要求的星期二;因为令人困惑的是开始时间可能与间隔时间不一致,我还没有完全测试过这个并建议如果你想每个星期二 运行,你的开始时间应该是星期二,比如 2017-12- 05 或 2017-11-28.
2017-12 年的参考资料:
# December 2017
Su Mo Tu We Th Fr Sa
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
因此,在您的 DAG 任务中,使用 运行 和 {{ds}}
或 {{execution_date}}
提供的上下文中的 jinja2 模板将引用示例的最后一列,而不是datetime.now()
会告诉您关于倒数第二列的内容。
提供这些是为了使您的任务可以是幂等的:f(data) = f(f(data))。
如果你 运行 你的 DAG 一次,最终状态应该和你 运行 你的 DAG N 次一样。这样,如果您 运行 您的(线性)DAG 和 5 个任务中的 3 个成功,但任务 4 失败而任务 5 从未 运行,您可以 re-run 整个 dag,任务 1- 3 将 pre-clean 或覆盖数据,这样输出就不会从先前成功的那些任务中改变,然后任务 4 和 5 有望成功,您将处于最终的良好状态。
找不到任务开始时间的解决方案。我有代码,找不到我错的地方。
当我有 运行 DAG 时,25.03、26.03、27.03。任务已完成,但今天 (28.03) 任务未在 6:48 开始。
我试过用cron表达式,pendulum,datetime,结果都是一样的。本地时间(UTC+3)和气流的时间(UTC)是不同的。我尝试在 'start date' 或 'schedule interval' 中每次(本地,气流)使用 - 没有结果。
使用:Ubuntu、Airflow v.1.9.0 和本地执行器。
emailname = Variable.get('test_mail')
l_start_date = datetime(2018, 3, 25, 6, 48)
l_schedule_interval = '@daily'
WORKFLOW_DEFAULT_ARGS = {
'owner': 'owner',
'depends_on_past': True,
'start_date': l_start_date,
'email': emailname,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retries_delay': timedelta(minutes=1),
}
# initialize the DAG
dag = DAG(
dag_id='test_dag_mail',
default_args=WORKFLOW_DEFAULT_ARGS,
schedule_interval=l_schedule_interval,
start_date=l_start_date,
)
这是 Airflow 的一个特性:
Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.
Let’s Repeat That: The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.
scheduler page 没有很好地解释 execution_date
设置为前一个 运行 和当前 运行 之间的时间段的开始。为什么?好吧,假设每月、每天、每小时的作业需要读取从那个时间到当前间隔时间的所有数据。是的,他们本可以改变它。
示例(将 nano/millis 秒缩短为一位数):
DAG start_date DAG schedule_interval Task Started Task execution_date
2017-12-01 '@daily' 2017-12-02 00:00:02.8 2017-12-01 00:00:00.0
2017-12-03 00:00:01.4 2017-12-02 00:00:00.0
2017-12-01 '@weekly' 2017-12-08 00:00:01.5 2017-12-01 00:00:00.0
2017-12-15 00:00:03.9 2017-12-08 00:00:00.0
2017-12-01 '33 03 * * *' 2017-12-02 03:33:01.6 2017-12-01 03:33:00.0
2017-12-03 03:33:02.2 2017-12-02 03:33:00.0
2017-12-01 '33 03 * * 2' 2017-12-12 03:33:01.7 2017-12-05 03:33:00.0
2017-12-19 03:33:03.1 2017-12-12 03:33:00.0
对于给出的最后一个案例,请注意开始日期不是间隔中要求的星期二;因为令人困惑的是开始时间可能与间隔时间不一致,我还没有完全测试过这个并建议如果你想每个星期二 运行,你的开始时间应该是星期二,比如 2017-12- 05 或 2017-11-28.
2017-12 年的参考资料:
# December 2017
Su Mo Tu We Th Fr Sa
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
因此,在您的 DAG 任务中,使用 运行 和 {{ds}}
或 {{execution_date}}
提供的上下文中的 jinja2 模板将引用示例的最后一列,而不是datetime.now()
会告诉您关于倒数第二列的内容。
提供这些是为了使您的任务可以是幂等的:f(data) = f(f(data))。
如果你 运行 你的 DAG 一次,最终状态应该和你 运行 你的 DAG N 次一样。这样,如果您 运行 您的(线性)DAG 和 5 个任务中的 3 个成功,但任务 4 失败而任务 5 从未 运行,您可以 re-run 整个 dag,任务 1- 3 将 pre-clean 或覆盖数据,这样输出就不会从先前成功的那些任务中改变,然后任务 4 和 5 有望成功,您将处于最终的良好状态。