气流:如何安排 dag 在工作日的第二天开始?

Airflow: how to schedule a dag to start the day following a weekday?

如何安排一个 dag 的执行日期是工作日,但开始日期是次日(不一定是工作日)?

我的理由是我在每个工作日结束时获取数据,我想在第二天早上处理这些数据。气流 common pitfalls 将执行日期描述为数据所属的日期,而开始日期是您 运行 您的 ETL 的日期。


例如:我想要一系列 dag 运行s 具有以下执行和开始日期 -

DAG start_date      Task Started          Task execution_date
2018-01-01          2018-01-02 Tues       2018-01-01 Mon
                    2018-01-03 Wed        2018-01-02 Tues
                    2018-01-04 Thur       2018-01-03 Wed
                    2018-01-05 Fri        2018-01-04 Thur
                    2018-01-06 Sat        2018-01-05 Fri
                    2018-01-06 Tues       2018-01-08 Mon

我最接近的方法是使用时间表:0 2 * * TUE-SAT 它在星期二开始时执行日期(星期六)错误(见下文)

DAG start_date      Task Started          Task execution_date
2018-01-01          2018-01-03 Wed        2018-01-02 Tues
                    2018-01-04 Thur       2018-01-03 Wed
                    2018-01-05 Fri        2018-01-04 Thur
                    2018-01-06 Sat        2018-01-05 Fri
                    2018-01-09 Tues       2018-01-06 Sat

或时间表:0 2 * * MON-FRI 运行 星期五 DAG 到星期一,我需要周末的结果。

DAG start_date      Task Started          Task execution_date
2018-01-01          2018-01-02 Tues       2018-01-01 Mon
                    2018-01-03 Wed        2018-01-02 Tues
                    2018-01-04 Thur       2018-01-03 Wed
                    2018-01-05 Fri        2018-01-04 Thur
                    2018-01-08 Mon        2018-01-05 Fri
                    2018-01-06 Tues       2018-01-08 Mon

首先引用Airflow docs

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

那么这里发生了什么?

  1. Cron 指定周期

指定 0 2 * * MON-FRI 意味着您的经期是:

MON 2AM -> TUE 2AM
TUE 2AM -> WED 2AM
WED 2AM -> THU 2AM
THU 2AM -> FRI 2AM
FRI 2AM -> MON 2AM <- the problem
  1. Airflow 将执行日期设置为周期的开始,并等待它的结束。

这意味着您所需的执行日期定义了周期的结束日期,但您所需的数据分区紧随周期的开始。

长话短说:不可能指定周的周期性划分,使得每个周期都从工作日开始到第二天结束。为什么?因为没有句号来表示周末发生的事情。

如何进行有效的周期性划分?

  • 只需将其设置为每天凌晨 2 点,并在 DAG 的开头放置一个条件任务,如果执行日期是周末则跳过执行。
  • 使用 0 2 * * TUE-SAT 但不要相信 execution_date 表示您的下一个要处理的数据何时开始确切地,但是当您过去的数据是视为已处理。