运行 每 X 分钟一个 Airflow DAG
Running an Airflow DAG every X minutes
我在使用 LocalScheduler
选项的 EC2 实例上使用气流。我调用了 airflow scheduler
和 airflow webserver
并且一切似乎 运行 都很好。也就是说,在为 "do this every 10 minutes," '*/10 * * * *'
向 schedule_interval
提供 cron
字符串后,默认情况下,该作业继续每 24 小时执行一次。这是代码的 header:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import workers
else:
print('Define PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
'max_active_runs': 1,
'concurrency': 4,
'schedule_interval': '*/10 * * * *' #..every 10 minutes
}
DAG = DAG(
dag_id='dash_update',
default_args=default_args
)
...
default_args 仅用于填充传递给 DAG 中的运算符的参数。 max_active_runs
、concurrency
和 schedule_interval
都是用于初始化 DAG 的参数,而不是运算符。这就是你想要的:
DAG = DAG(
dag_id='dash_update',
start_date=datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=1,
concurrency=4,
schedule_interval='*/10 * * * *', #..every 10 minutes
default_args=default_args,
)
我之前也把它们混在一起了,所以供参考(注意有重叠):
DAG 参数:https://airflow.incubator.apache.org/code.html?highlight=dag#airflow.models.DAG
运算符参数:https://airflow.incubator.apache.org/code.html#baseoperator
对于 >2.1 的气流版本,您可以使用 datetime.timedelta()
对象:
DAG = DAG(
dag_id='dash_update',
start_date=datetime(2017, 9, 9, 10, 0, 0, 0),
max_active_runs=1,
concurrency=4,
schedule_interval=timedelta(minutes=10),
default_args=default_args,
)
处理 start_date 的另一个很酷的功能是 days_ago
from airflow.utils.dates import days_ago
DAG = DAG(
dag_id='dash_update',
start_date=days_ago(2, minute=15), # would start 2 days ago at 00:15
max_active_runs=1,
concurrency=4,
schedule_interval=timedelta(minutes=10),
default_args=default_args,
)
我在使用 LocalScheduler
选项的 EC2 实例上使用气流。我调用了 airflow scheduler
和 airflow webserver
并且一切似乎 运行 都很好。也就是说,在为 "do this every 10 minutes," '*/10 * * * *'
向 schedule_interval
提供 cron
字符串后,默认情况下,该作业继续每 24 小时执行一次。这是代码的 header:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import workers
else:
print('Define PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
'max_active_runs': 1,
'concurrency': 4,
'schedule_interval': '*/10 * * * *' #..every 10 minutes
}
DAG = DAG(
dag_id='dash_update',
default_args=default_args
)
...
default_args 仅用于填充传递给 DAG 中的运算符的参数。 max_active_runs
、concurrency
和 schedule_interval
都是用于初始化 DAG 的参数,而不是运算符。这就是你想要的:
DAG = DAG(
dag_id='dash_update',
start_date=datetime(2017, 9, 9, 10, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=1,
concurrency=4,
schedule_interval='*/10 * * * *', #..every 10 minutes
default_args=default_args,
)
我之前也把它们混在一起了,所以供参考(注意有重叠):
DAG 参数:https://airflow.incubator.apache.org/code.html?highlight=dag#airflow.models.DAG 运算符参数:https://airflow.incubator.apache.org/code.html#baseoperator
对于 >2.1 的气流版本,您可以使用 datetime.timedelta()
对象:
DAG = DAG(
dag_id='dash_update',
start_date=datetime(2017, 9, 9, 10, 0, 0, 0),
max_active_runs=1,
concurrency=4,
schedule_interval=timedelta(minutes=10),
default_args=default_args,
)
处理 start_date 的另一个很酷的功能是 days_ago
from airflow.utils.dates import days_ago
DAG = DAG(
dag_id='dash_update',
start_date=days_ago(2, minute=15), # would start 2 days ago at 00:15
max_active_runs=1,
concurrency=4,
schedule_interval=timedelta(minutes=10),
default_args=default_args,
)