从今天开始,即 2019-12-18,每 5 分钟将气流中的 DAG 安排为 运行

Schedule a DAG in airflow to run for every 5 minutes , starting from today i.e., 2019-12-18

我正在尝试从今天 (2019-12-18) 开始每 5 分钟 运行 一个 DAG。我将开始日期定义为 start_date:dt.datetime(2019, 12, 18, 10, 00, 00) 并将计划间隔定义为 schedule_interval= '*/5 * * * *' 。当我启动 airflow scheduler 时,我没有看到我的任何任务 运行ning。

但是当我将 start_date 修改为 start_date:dt.datetime(2019, 12, 17, 10, 00, 00) 即昨天的日期时,DAG 运行 每 10 秒而不是 5 分钟连续出现。

我认为解决这个问题的方法是正确设置 start_date,但我找不到完美的解决方案。请帮助我!

这是我的代码。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
import datetime as dt
from airflow.operators.python_operator import PythonOperator

def print_world():
   print('world')


default_args = {
    'owner': 'bhanuprakash',
    'depends_on_past': False,
    'start_date': dt.datetime(2019, 12, 18, 10, 00, 00),
    'email': ['bhanuprakash.uchula@techwave.net'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5)
}

with DAG('dag_today',
    default_args=default_args,
    schedule_interval= '*/5 * * * *'
    ) as dag:


    print_hello = BashOperator(task_id='print_hello',
        bash_command='gnome-terminal')


    sleep = BashOperator(task_id='sleep',
        bash_command='sleep 5')


    print_world = PythonOperator(task_id='print_world',
        python_callable=print_world)

print_hello >> sleep >> print_world

您传递给 Airflow 的日期时间对象不支持时区。 Airflow 在内部使用 UTC。您传递给 Airflow 的原始日期时间对象可能与调度程序的时间概念不一致,这可能就是为什么 DAG 没有被安排到 运行 午夜 "today" (2019-12-18) .

而不是像这样传递一个天真的日期时间对象:

'start_date': dt.datetime(2019, 12, 18, 10, 00, 00)

尝试使用 pendulum 让您的 DAG 时区感知:[​​=15=]

import pendulum

...
'start_date': pendulum.datetime(year=2019, month=12, day=10).astimezone('YOUR TIMEZONE'), # See list of tz database time zones here -> https://en.wikipedia.org/wiki/List_of_tz_database_time_zones

文档 (https://airflow.apache.org/docs/stable/timezone.html) 非常有用,得到了有关如何在 Airflow 中处理日期时间的提示。

至于你关于 运行 频率的其他问题...默认情况下,DAG 运行 旨在对开始日期和结束日期之间的所有时间间隔执行 "Catchup"。要禁用此行为,您需要在实例化 DAG 时添加 catchup=False。

来自Airflow docs

Backfill and Catchup

An Airflow DAG with a start_date, possibly an end_date, and a schedule_interval defines a series of intervals which the scheduler turn into individual Dag Runs and execute. A key capability of Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any interval that has not been run (or has been cleared). This concept is called Catchup.

If your DAG is written to handle its own catchup (IE not limited to the interval, but instead to “Now” for instance.), then you will want to turn catchup off (Either on the DAG itself with dag.catchup = False) or by default at the configuration file level with catchup_by_default = False. What this will do, is to instruct the scheduler to only create a DAG Run for the most current instance of the DAG interval series.

我建议浏览我链接的两个页面,以更好地了解 Airflow 的基本概念。