气流 - 运行 每 2 小时一次

Airflow - Run a dag every 2h

我有一个在 MySQL table 中插入一些记录的 dag,我想每 2 小时 运行 我的 dag。为此,我有这个代码:

from datetime import timedelta, datetime
import pymysql
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators import MySqlOperator

default_args = {
 'owner': 'airflow',
 'start_date': datetime(2021, 4, 1),
 'depends_on_past': False
}

dag = DAG(dag_id='mysql_insert',
          default_args=default_args,
          schedule_interval='* */2 * * *',
          dagrun_timeout=timedelta(seconds=5))

def mysql_conn(servername, user, password, database):
    conn = pymysql.connect(user=user
                           , password=password
                           , host=servername
                           , port=3306
                           , database=database)
    return conn

conn = mysql_conn('', '', '', '')

def insert_data():
    src_cursor = conn.cursor()
    src_cursor.execute("INSERT INTO my_table VALUES('NA',CURRENT_TIMESTAMP())")
    conn.commit()

task = PythonOperator(
     task_id='insert_records'
    ,python_callable=insert_data
    ,dag=dag)

task

但是,当我激活我的 dag 时,它每 10 秒加载一次数据。

我做错了什么?

感谢您的帮助!

您已将 start date 设置为 2021-04-01,并且 catchup 未在 dag 定义中设置,因此默认为 true。 如果您查看作业执行(转到浏览 -> Dag 运行),您会发现 dag 运行从开始日期开始就已经开始并且处于赶上状态。从 2021-04-01 开始,您将看到 execution date。 如果您想跳过这些追赶运行,请在 dag 参数中将 catchup 标记为 False