Airflow DAG 运行 每秒而不是每分钟

Airflow DAG Running Every Second Rather Than Every Minute

我正在尝试将我的 DAG 安排到每分钟 运行,但它似乎是每秒 运行ning。根据我读过的所有内容,我只需要在我的 DAG 中包含 schedule_interval='*/1 * * * *', #..every 1 minute 就可以了,但它不起作用。这里我设置了一个简单的例子来测试它:

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 4),
    'schedule_interval': '*/1 * * * *', #..every 1 minute
    'email': ['airflow@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    max_active_runs=3,
    schedule_interval='*/1 * * * *', #..every 1 minute
    default_args=default_args,
)

test= BashOperator(
    task_id='test',
    bash_command="echo hey >> /home/ec2-user/schedule_test.txt",
    retries=1,
    dag=dag)

更新:

在与@Taylor Edmiston 讨论他的解决方案后,我们意识到我需要添加 catchup=False 的原因是因为我使用 Pip 安装了 Airflow,它使用了过时版本的 Airflow。显然,如果您使用 master branch of it's repository 中的 Airflow,则无需像我尝试的那样包含 catchup=False 以使其每分钟 运行。因此,尽管接受的答案解决了我的问题,但它并没有解决@Taylor Edmiston 发现的潜在问题。

尝试在 DAG() 中添加 catchup=False。可能是因为您声明的 start_date,您的 DAG 正在尝试回填。

您在 DAG 上的 schedule_interval 是正确的:*/1 * * * *every minute

您还可以从 default_args 中删除 start_dateschedule_interval,因为它们与提供给 DAG 的 kwargs 是多余的。

如果您更改了第一次创建此 DAG 时的计划,Airflow 可能会感到困惑。在数据库中尝试 ,然后重新启动调度程序和网络服务器。如果你在 Airflow 的 master 分支上,它就像 $ airflow delete_dag my_dag 一样简单;否则,链接的答案解释了如何在其他版本上执行此操作。

我将你的代码归结为这个来检查,当 运行 在 Airflow 的主分支中时,它肯定是每分钟 运行ning 一个 DAG 运行。

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    schedule_interval='*/1 * * * *',
    default_args=default_args,
)

test = BashOperator(
    task_id='test',
    bash_command='echo "one minute test"',
    dag=dag,
)

DAG 运行s: