运行 每天特定时间的DAG

Run DAG at specific time each day

我已经多次阅读了关于 schedule_intervalstart_date 和 Airflow 文档的多个示例,但我仍然无法理解:

如何在每天的特定时间执行我的 DAG?例如,现在是 9:30 (AM),我部署了我的 DAG,我希望它在 10:30

执行

我试过了


with DAG(
    "test",
    default_args=default_args,
    description= "test",
    schedule_interval = "0 10 * * *",
    start_date = days_ago(0),
    tags = ["goodie"]) as dag:

但出于某种原因,今天不是 运行。我尝试了不同的 start_datesstart_date = datetime.datetime(2021,6,23) 但它没有被执行。

如果我用 days_ago(1) 替换 days_ago(0) 它总是落后 1 天,即今天没有 运行 但昨天 运行

有没有一种简单的方法可以说“我现在部署我的 DAG,我想用这个 cron-syntax 来执行它”(我假设这是大多数人想要的)而不是计算执行时间, 根据 start_date, schedule_interval 算出来, 怎么解释?

If I replace days_ago(0) with days_ago(1) it is behind 1 day all the time

没落后。您只是将 Airflow 调度机制与 cron 作业混淆了。在 cron 作业中,您只需提供一个 cron 表达式,它就会相应地进行调度——这不是它在 Airflow 中的工作方式。

在 Airflow 中,调度由 start_date + schedule interval 计算得出。 Airflow 在间隔结束时执行作业。这与数据管道通常的工作方式一致。今天您正在处理昨天的数据,因此在这一天结束时您想要启动一个处理昨天记录的过程。

通常 - 永远不要使用动态开始日期。

设置:

with DAG(
    "test",
    default_args=default_args,
    description= "test",
    schedule_interval = "0 10 * * *",
    start_date = datetime(2021,06,23, 10 ,0), # 2021-06-23 10:00
    tags = ["goodie"]) as dag:

意味着第一个将在 2021-06-24 10:00 开始,这个 运行 execution_date 将是 2021-06-23 10:00。第二个 运行 将在 2021-06-25 10:00 开始,这个 运行 execution_date 将是 2021-06-24 10:00

由于这让许多新用户感到困惑,因此正在进行架构更改 AIP-39 Richer scheduler_interval,它将在 WHEN 到 运行 和考虑此 [=40= 的时间间隔之间解耦].它将在 Airflow 2.3.0

中可用

更新 Airflow>=2.3.0: AIP-39 Richer scheduler_interval 已完成并发布 它添加了 Timetable support so you can Customizing DAG Scheduling with Timetables