气流 - 运行 每 2 小时一次
Airflow - Run a dag every 2h
我有一个在 MySQL table 中插入一些记录的 dag,我想每 2 小时 运行 我的 dag。为此,我有这个代码:
from datetime import timedelta, datetime
import pymysql
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators import MySqlOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 4, 1),
'depends_on_past': False
}
dag = DAG(dag_id='mysql_insert',
default_args=default_args,
schedule_interval='* */2 * * *',
dagrun_timeout=timedelta(seconds=5))
def mysql_conn(servername, user, password, database):
conn = pymysql.connect(user=user
, password=password
, host=servername
, port=3306
, database=database)
return conn
conn = mysql_conn('', '', '', '')
def insert_data():
src_cursor = conn.cursor()
src_cursor.execute("INSERT INTO my_table VALUES('NA',CURRENT_TIMESTAMP())")
conn.commit()
task = PythonOperator(
task_id='insert_records'
,python_callable=insert_data
,dag=dag)
task
但是,当我激活我的 dag 时,它每 10 秒加载一次数据。
我做错了什么?
感谢您的帮助!
您已将 start date
设置为 2021-04-01,并且 catchup
未在 dag 定义中设置,因此默认为 true
。
如果您查看作业执行(转到浏览 -> Dag 运行),您会发现 dag 运行从开始日期开始就已经开始并且处于赶上状态。从 2021-04-01 开始,您将看到 execution date
。
如果您想跳过这些追赶运行,请在 dag 参数中将 catchup
标记为 False
。
我有一个在 MySQL table 中插入一些记录的 dag,我想每 2 小时 运行 我的 dag。为此,我有这个代码:
from datetime import timedelta, datetime
import pymysql
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators import MySqlOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 4, 1),
'depends_on_past': False
}
dag = DAG(dag_id='mysql_insert',
default_args=default_args,
schedule_interval='* */2 * * *',
dagrun_timeout=timedelta(seconds=5))
def mysql_conn(servername, user, password, database):
conn = pymysql.connect(user=user
, password=password
, host=servername
, port=3306
, database=database)
return conn
conn = mysql_conn('', '', '', '')
def insert_data():
src_cursor = conn.cursor()
src_cursor.execute("INSERT INTO my_table VALUES('NA',CURRENT_TIMESTAMP())")
conn.commit()
task = PythonOperator(
task_id='insert_records'
,python_callable=insert_data
,dag=dag)
task
但是,当我激活我的 dag 时,它每 10 秒加载一次数据。
我做错了什么?
感谢您的帮助!
您已将 start date
设置为 2021-04-01,并且 catchup
未在 dag 定义中设置,因此默认为 true
。
如果您查看作业执行(转到浏览 -> Dag 运行),您会发现 dag 运行从开始日期开始就已经开始并且处于赶上状态。从 2021-04-01 开始,您将看到 execution date
。
如果您想跳过这些追赶运行,请在 dag 参数中将 catchup
标记为 False
。