气流:如何确保每 5 分钟后 DAG 运行?
Airflow: How to make sure that DAG run after every 5 minutes?
我正在探索Apache Airflow
。我正在使用一种在 MySQL 中插入记录的方法。
我已将 DAG
安排为每 5 分钟后 运行 但它似乎没有发生,因为 MYSQL 时间戳告诉 MySQL 任务正在执行很多5 分钟以内。
如您所见,它在几分钟内插入了记录。下面是我的代码:
import datetime as dt
from airflow import DAG
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
def fetch_data_mysql():
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
sql = 'SELECT * from random_table'
sql = "INSERT INTO random_table(text) VALUES ('Hi Adnan')"
print('INSERT MYSQL RESULT')
# results = mysql_hook.get_records(sql)
# results = mysql_hook.run(sql, autocommit=True, parameters=('Hi Addu',))
mysql_hook.run(sql, autocommit=True)
def print_world():
print('world')
return 'WORLD IN SEPTEMBER'
default_args = {
'owner': 'me',
'start_date': dt.datetime(2018, 9, 11),
'retries': 1,
'retry_delay': dt.timedelta(minutes=2),
}
with DAG('airflow_tutorial_v01',
default_args=default_args,
schedule_interval='0/5 * * * *',
) as dag:
print_hello = BashOperator(task_id='print_hello',
bash_command='echo "hello"')
sleep = BashOperator(task_id='sleep',
bash_command='sleep 5')
print_world = PythonOperator(task_id='print_world',
python_callable=print_world)
mysql_task = PythonOperator(task_id='mysql_tut', python_callable=fetch_data_mysql)
print_hello >> sleep >> print_world >> mysql_task
我正在使用 v1.10.0
。
这里给出日志的link:- https://www.dropbox.com/s/f0g64mhi8sgzlvw/my_simple_dag.py.log?dl=0
尝试将您的 cron 计划从 0/5 * * * *
更改为 */5 * * * *
。后者每五分钟一次,而前者似乎是根据 crontab.guru
的非标准 cron 语法
你狗在回填。如果您查看日志,其执行日期为 2018-09-20 00:15:00+00:00
、2018-09-20 00:20:00+00:00
、2018-09-20 00:25:00+00:00
,依此类推。
将以下内容添加到您的 default_args
:
'catchup_by_default': False
您的 default_args
应如下所示:
default_args = {
'owner': 'me',
'start_date': dt.datetime(2018, 9, 11),
'retries': 1,
'retry_delay': dt.timedelta(minutes=2),
'catchup_by_default': False,
}
我正在探索Apache Airflow
。我正在使用一种在 MySQL 中插入记录的方法。
我已将 DAG
安排为每 5 分钟后 运行 但它似乎没有发生,因为 MYSQL 时间戳告诉 MySQL 任务正在执行很多5 分钟以内。
如您所见,它在几分钟内插入了记录。下面是我的代码:
import datetime as dt
from airflow import DAG
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
def fetch_data_mysql():
mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
sql = 'SELECT * from random_table'
sql = "INSERT INTO random_table(text) VALUES ('Hi Adnan')"
print('INSERT MYSQL RESULT')
# results = mysql_hook.get_records(sql)
# results = mysql_hook.run(sql, autocommit=True, parameters=('Hi Addu',))
mysql_hook.run(sql, autocommit=True)
def print_world():
print('world')
return 'WORLD IN SEPTEMBER'
default_args = {
'owner': 'me',
'start_date': dt.datetime(2018, 9, 11),
'retries': 1,
'retry_delay': dt.timedelta(minutes=2),
}
with DAG('airflow_tutorial_v01',
default_args=default_args,
schedule_interval='0/5 * * * *',
) as dag:
print_hello = BashOperator(task_id='print_hello',
bash_command='echo "hello"')
sleep = BashOperator(task_id='sleep',
bash_command='sleep 5')
print_world = PythonOperator(task_id='print_world',
python_callable=print_world)
mysql_task = PythonOperator(task_id='mysql_tut', python_callable=fetch_data_mysql)
print_hello >> sleep >> print_world >> mysql_task
我正在使用 v1.10.0
。
这里给出日志的link:- https://www.dropbox.com/s/f0g64mhi8sgzlvw/my_simple_dag.py.log?dl=0
尝试将您的 cron 计划从 0/5 * * * *
更改为 */5 * * * *
。后者每五分钟一次,而前者似乎是根据 crontab.guru
你狗在回填。如果您查看日志,其执行日期为 2018-09-20 00:15:00+00:00
、2018-09-20 00:20:00+00:00
、2018-09-20 00:25:00+00:00
,依此类推。
将以下内容添加到您的 default_args
:
'catchup_by_default': False
您的 default_args
应如下所示:
default_args = {
'owner': 'me',
'start_date': dt.datetime(2018, 9, 11),
'retries': 1,
'retry_delay': dt.timedelta(minutes=2),
'catchup_by_default': False,
}