Apache Airflow 中调度程序间隔和启动时间的工作不正确

Incorrect work of scheduler interval and start time in Apache Airflow

找不到任务开始时间的解决方案。我有代码,找不到我错的地方。

当我有 运行 DAG 时,25.03、26.03、27.03。任务已完成,但今天 (28.03) 任务未在 6:48 开始。

我试过用cron表达式,pendulum,datetime,结果都是一样的。本地时间(UTC+3)和气流的时间(UTC)是不同的。我尝试在 'start date' 或 'schedule interval' 中每次(本地,气流)使用 - 没有结果。

使用:Ubuntu、Airflow v.1.9.0 和本地执行器。

emailname = Variable.get('test_mail')
l_start_date = datetime(2018, 3, 25, 6, 48) 
l_schedule_interval = '@daily'

WORKFLOW_DEFAULT_ARGS = {
    'owner': 'owner',
    'depends_on_past': True,
    'start_date': l_start_date,
    'email': emailname,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retries_delay': timedelta(minutes=1),
}

# initialize the DAG
dag = DAG(
    dag_id='test_dag_mail',
    default_args=WORKFLOW_DEFAULT_ARGS,
    schedule_interval=l_schedule_interval,
    start_date=l_start_date,
 )

这是 Airflow 的一个特性:

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Let’s Repeat That: The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

发件人:https://airflow.apache.org/scheduler.html

scheduler page 没有很好地解释 execution_date 设置为前一个 运行 和当前 运行 之间的时间段的开始。为什么?好吧,假设每月、每天、每小时的作业需要读取从那个时间到当前间隔时间的所有数据。是的,他们本可以改变它。

示例(将 nano/millis 秒缩短为一位数):

DAG start_date DAG schedule_interval Task Started          Task execution_date
2017-12-01     '@daily'              2017-12-02 00:00:02.8 2017-12-01 00:00:00.0
                                     2017-12-03 00:00:01.4 2017-12-02 00:00:00.0

2017-12-01     '@weekly'             2017-12-08 00:00:01.5 2017-12-01 00:00:00.0
                                     2017-12-15 00:00:03.9 2017-12-08 00:00:00.0

2017-12-01     '33 03 * * *'         2017-12-02 03:33:01.6 2017-12-01 03:33:00.0
                                     2017-12-03 03:33:02.2 2017-12-02 03:33:00.0

2017-12-01     '33 03 * * 2'         2017-12-12 03:33:01.7 2017-12-05 03:33:00.0
                                     2017-12-19 03:33:03.1 2017-12-12 03:33:00.0

对于给出的最后一个案例,请注意开始日期不是间隔中要求的星期二;因为令人困惑的是开始时间可能与间隔时间不一致,我还没有完全测试过这个并建议如果你想每个星期二 运行,你的开始时间应该是星期二,比如 2017-12- 05 或 2017-11-28.

2017-12 年的参考资料:

#  December 2017
Su Mo Tu We Th Fr Sa
                1  2
 3  4  5  6  7  8  9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31

因此,在您的 DAG 任务中,使用 运行 和 {{ds}}{{execution_date}} 提供的上下文中的 jinja2 模板将引用示例的最后一列,而不是datetime.now() 会告诉您关于倒数第二列的内容。

提供这些是为了使您的任务可以是幂等的:f(data) = f(f(data))。

如果你 运行 你的 DAG 一次,最终状态应该和你 运行 你的 DAG N 次一样。这样,如果您 运行 您的(线性)DAG 和 5 个任务中的 3 个成功,但任务 4 失败而任务 5 从未 运行,您可以 re-run 整个 dag,任务 1- 3 将 pre-clean 或覆盖数据,这样输出就不会从先前成功的那些任务中改变,然后任务 4 和 5 有望成功,您将处于最终的良好状态。