Airflow DAG 每隔几秒运行一次,而不是配置 schedule_interval
Airflow DAG runs after every few seconds, instead of configured schedule_interval
我写了我的第一个 DAG。
DAG:
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
def csvtojson():
df = pd.read_csv("/home/v4g4x/D/Study/DataEngineering/PaulCrickard_Book/data.csv")
for i, r in df.iterrows():
print(r["name"])
df.to_json("/home/v4g4x/D/Study/DataEngineering/PaulCrickard_Book/airflow/fromAirflow.json", orient="records")
default_args = {
"owner": "VarunGawande",
"start_date": dt.datetime(2021, 6, 29),
"retries": 1,
"retry_delay": dt.timedelta(minutes=5)
}
with DAG(dag_id="MyCsvDag", default_args=default_args, schedule_interval=timedelta(minutes=5)) as dag:
print_starting = BashOperator(task_id="Starting", bash_command=" echo \"I am reading CSV now ....\"")
CSVJson = PythonOperator(task_id="convertCSVToJson", python_callable=csvtojson)
print_starting >> CSVJson
配置为今天开始,每5分钟安排一个新工作,如果失败,5分钟后尝试新工作,对吗?
但是在网络服务器 UI 中,当我将 DAG 从暂停切换到未暂停时。
它每隔几秒就会开始一项新工作。
我想很明显我是新手。
我看到了类似的,但他通过更改开始日期解决了这个问题,但这对我来说没有任何意义。
catchup
似乎有问题。您可以在 DAG
定义中设置 catchup = False
。
基本上,如果您不将 catchup
定义为 False
,则 DAG 将 运行 来自 start_date
的所有未决 运行。
我写了我的第一个 DAG。
DAG:
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
def csvtojson():
df = pd.read_csv("/home/v4g4x/D/Study/DataEngineering/PaulCrickard_Book/data.csv")
for i, r in df.iterrows():
print(r["name"])
df.to_json("/home/v4g4x/D/Study/DataEngineering/PaulCrickard_Book/airflow/fromAirflow.json", orient="records")
default_args = {
"owner": "VarunGawande",
"start_date": dt.datetime(2021, 6, 29),
"retries": 1,
"retry_delay": dt.timedelta(minutes=5)
}
with DAG(dag_id="MyCsvDag", default_args=default_args, schedule_interval=timedelta(minutes=5)) as dag:
print_starting = BashOperator(task_id="Starting", bash_command=" echo \"I am reading CSV now ....\"")
CSVJson = PythonOperator(task_id="convertCSVToJson", python_callable=csvtojson)
print_starting >> CSVJson
配置为今天开始,每5分钟安排一个新工作,如果失败,5分钟后尝试新工作,对吗?
但是在网络服务器 UI 中,当我将 DAG 从暂停切换到未暂停时。
它每隔几秒就会开始一项新工作。
我想很明显我是新手。
我看到了类似的
catchup
似乎有问题。您可以在 DAG
定义中设置 catchup = False
。
基本上,如果您不将 catchup
定义为 False
,则 DAG 将 运行 来自 start_date
的所有未决 运行。