如何使用气流安排任务

How to schedule a task with airflow

不幸的是,即使阅读了这里的许多问题和airflow 网站的FAQ 页面,我仍然不明白airflow 是如何调度任务的。我这里有一个非常简单的示例任务:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    "depends_on_past": False,
    "start_date": datetime(2020, 5, 29),
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "example_dag_one",
    schedule_interval="30 8 * * *",
    catchup=False,
    default_args=default_args,
)

with dag:

    t1 = BashOperator(task_id="print_hello", bash_command="echo hello", dag=dag)

    t1

我的天真观点是,该任务将在 5 月 29 日 08:30 运行。但是随着时间的推移,airflow已经没有调度那个任务了。如果我将 cron 表达式更改为类似:'* 8 * * *' 它将每分钟安排一个任务。

然而,当我使用同一个 DAG,开始日期是昨天(在这种情况下是 5 月 28 日)时,任务将安排在 08:30,但它的执行日期是 28 日(即使它 运行 5 月 29 日),网络上的开始日期 ui 是 5 月 29 日。这非常令人困惑。

最终我想要的气流很简单:"Here is python code, run it on this time day"。那我怎么能做到呢。再次假设我想从明天开始每天在 08:30 安排一个任务。

答案可以在Airflow official documentation中找到:

请注意,如果您在一天的 schedule_interval 上 运行 DAG,则 运行 标记为 2016-01-01 将在 2016-01-01T23 后不久触发: 59.换句话说,作业实例在其涵盖的时间段结束后启动。

让我们重复一遍 调度程序 运行你的工作 schedule_interval 在开始日期之后,在期间结束时。

因此适用于您的案例,如果您将开始日期设为 5 月 29 日,而原始 cron 将从明天 5 月 30 日开始每天 运行 08:30。

无论如何,如果你不需要在一天中的某个特定时间点进行 dag,你可以将调度间隔设置为'@daily',它会在每个时间的开始(00:00)触发日。如果@daily 有很多 dags,不用担心,调度器和 worker 会知道如何处理它来执行所有这些。如果您有依赖于其他 dag 的 dag,可以使用一些机制将它们连接起来,这样您仍然不必担心指定小时数。

实际上Airflow会等待整个调度间隔(1天)完成,然后开始执行!

因此,如果您希望今天2020/ 5/ 29 执行您的任务,您应该以计划间隔完成的方式设置开始时间。所以将开始时间设置为:datetime(2020, 5, 28)

如果计划间隔为 1 周,则任务将在开始时间后 1 周启动,依此类推...