Airflow DAG 每隔几秒运行一次,而不是配置 schedule_interval

Airflow DAG runs after every few seconds, instead of configured schedule_interval

我写了我的第一个 DAG。
DAG:

import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd


def csvtojson():
    df = pd.read_csv("/home/v4g4x/D/Study/DataEngineering/PaulCrickard_Book/data.csv")
    for i, r in df.iterrows():
        print(r["name"])
    df.to_json("/home/v4g4x/D/Study/DataEngineering/PaulCrickard_Book/airflow/fromAirflow.json", orient="records")


default_args = {
    "owner": "VarunGawande",
    "start_date": dt.datetime(2021, 6, 29),
    "retries": 1,
    "retry_delay": dt.timedelta(minutes=5)
}

with DAG(dag_id="MyCsvDag", default_args=default_args, schedule_interval=timedelta(minutes=5)) as dag:
    print_starting = BashOperator(task_id="Starting", bash_command=" echo \"I am reading CSV now ....\"")
    CSVJson = PythonOperator(task_id="convertCSVToJson", python_callable=csvtojson)

    print_starting >> CSVJson

配置为今天开始,每5分钟安排一个新工作,如果失败,5分钟后尝试新工作,对吗?

但是在网络服务器 UI 中,当我将 DAG 从暂停切换到未暂停时。
它每隔几秒就会开始一项新工作。

我想很明显我是新手。 我看到了类似的,但他通过更改开始日期解决了这个问题,但这对我来说没有任何意义。

catchup 似乎有问题。您可以在 DAG 定义中设置 catchup = False

基本上,如果您不将 catchup 定义为 False,则 DAG 将 运行 来自 start_date 的所有未决 运行。