设置气流调度间隔
set airflow schedule interval
我在 airflow 中创建了任务,我计划每小时 运行 并且 start_date
设置为 2016-11-16
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")
我在 10:00 AM
的当前时间启动了气流,我可以看到气流正在 运行 从 00:00 AM
开始,然后是 01:00 AM
等等:
INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T00:00:00 --local -sd DAGS_FOLDER/test_airflow.py
........
........
INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T01:00:00 --local -sd DAGS_FOLDER/test_airflow.py
.......
.......
如何将气流配置为从当前时间开始,然后每小时 运行 开始,而不是从 00:00
开始?
在你的问题中你写了字典:default_args
In this there is Key: 'start_date': datetime(2016, 11, 16)
实际上这里创建了具有输入 YYYY/MM/DD 格式的日期时间对象,我们不提供时间输入,因此它采用默认值 00:00,因此您的脚本在时间 00:00 运行
你可以这样检查:在 python
from datetime import datetime
datetime(2016, 11, 16)
#That Datetime object is generated with 00:00 Time
#datetime(2016, 11, 16, 0, 0)
#If you need Current date and time to start process you can set value as:
'start_date': datetime.now()
#if you want only current time with respective date then you can use as fallows:
current_date = datetime.now()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16, current_date.hour, current_date.minute),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")
- 在 python 虚拟环境中安装 airflow。
- 激活环境。
- 在
~/airflow/airflow.cfg
中重置 load_examples = False
- 启动气流。 $
airflow webserver -p <port>
- 将下面的代码复制到
~/airflow/dags
- 启动调度程序
$ airflow scheduler
现在调度间隔见下面的代码。
试试这个:
'start_date': datetime.now()
dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *")
或
'start_date': datetime(2015, 6, 1),
dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly")
完整代码
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
#'start_date': datetime(2015, 6, 1),
'start_date': datetime.now(),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
#'retries': 1,
#'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *") // For minute
#dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly")
#
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
Airflow 提供 gem 一个名为 LatestOnlyOperator 的运算符,以跳过最近计划 运行 期间未被 运行 的任务一个DAG。如果现在的时间不在其 execution_time 和下一个计划的 execution_time 之间,则 LatestOnlyOperator 将跳过所有直接的下游任务及其自身。该运算符减少了 CPU 周期的浪费。
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
task1 = DummyOperator(task_id='task1', dag=dag)
latest_only >> task
Latest_only 应始终位于要跳过的任务的上游。
latest_only 运算符的优点是,每当您重新启动 dag 时,它将跳过所有以前的任务和 运行 当前 dag。
另外最好不要硬编码开始时间。而是输入:
from datetime import datetime, timedelta
START_DATE = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
我在 airflow 中创建了任务,我计划每小时 运行 并且 start_date
设置为 2016-11-16
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")
我在 10:00 AM
的当前时间启动了气流,我可以看到气流正在 运行 从 00:00 AM
开始,然后是 01:00 AM
等等:
INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T00:00:00 --local -sd DAGS_FOLDER/test_airflow.py
........
........
INFO - Executing command: airflow run test_hourly_job task1 2016-11-16T01:00:00 --local -sd DAGS_FOLDER/test_airflow.py
.......
.......
如何将气流配置为从当前时间开始,然后每小时 运行 开始,而不是从 00:00
开始?
在你的问题中你写了字典:default_args
In this there is Key: 'start_date': datetime(2016, 11, 16)
实际上这里创建了具有输入 YYYY/MM/DD 格式的日期时间对象,我们不提供时间输入,因此它采用默认值 00:00,因此您的脚本在时间 00:00 运行 你可以这样检查:在 python
from datetime import datetime
datetime(2016, 11, 16)
#That Datetime object is generated with 00:00 Time
#datetime(2016, 11, 16, 0, 0)
#If you need Current date and time to start process you can set value as:
'start_date': datetime.now()
#if you want only current time with respective date then you can use as fallows:
current_date = datetime.now()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16, current_date.hour, current_date.minute),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")
- 在 python 虚拟环境中安装 airflow。
- 激活环境。
- 在
~/airflow/airflow.cfg
中重置 - 启动气流。 $
airflow webserver -p <port>
- 将下面的代码复制到
~/airflow/dags
- 启动调度程序
$ airflow scheduler
load_examples = False
现在调度间隔见下面的代码。
试试这个:
'start_date': datetime.now()
dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *")
或
'start_date': datetime(2015, 6, 1),
dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly")
完整代码
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
#'start_date': datetime(2015, 6, 1),
'start_date': datetime.now(),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
#'retries': 1,
#'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval="* * * * *") // For minute
#dag = DAG('tutorial', default_args=default_args, schedule_interval="@hourly")
#
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
Airflow 提供 gem 一个名为 LatestOnlyOperator 的运算符,以跳过最近计划 运行 期间未被 运行 的任务一个DAG。如果现在的时间不在其 execution_time 和下一个计划的 execution_time 之间,则 LatestOnlyOperator 将跳过所有直接的下游任务及其自身。该运算符减少了 CPU 周期的浪费。
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 11, 16),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('test_hourly_job', default_args=default_args,schedule_interval="@hourly")
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
task1 = DummyOperator(task_id='task1', dag=dag)
latest_only >> task
Latest_only 应始终位于要跳过的任务的上游。 latest_only 运算符的优点是,每当您重新启动 dag 时,它将跳过所有以前的任务和 运行 当前 dag。
另外最好不要硬编码开始时间。而是输入:
from datetime import datetime, timedelta
START_DATE = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())