气流未正确安排 Python
Airflow not scheduling Correctly Python
代码:
Python 版本 2.7.x 和气流版本 1.5.1
我的 dag 脚本是这样的
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
由此您可以看到我正在创建一个包含 6 个任务的 DAG,第一个任务 (Start1) 首先启动,然后所有其他五个任务启动
目前我在 DAG 启动之间设置了 5 分钟的时间延迟
第一种类型的所有六个任务都运行完美,但五分钟后 DAG 没有重新启动
1 个小时多了,DAG 还没有重新启动我真的不知道我错了。
如果有人能指出我哪里出了问题,那就太好了。我尝试使用 airflow testing clear
进行清除,然后对同样的事情进行清除 happen.It 运行 第一个实例然后就站着那里。
命令行显示的唯一内容是 Getting all instance for DAG testing
当我更改 schedule_interval 的位置时,它只是在没有任何计划间隔的情况下运行 parallel.That 在 5 分钟内完成了 300 个或更多任务实例。没有 5 分钟的计划间隔
代码 2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
对于代码2,我猜它每分钟运行的原因是:
开始时间是2015-10-1300:00
计划间隔为 5 分钟
调度程序的每个心跳(默认为 5 秒),您的 DAG 将被检查
- 首先检查:开始日期(未找到最后执行日期)+ 调度程序
间隔<当前时间?如果是,DAG 将被执行并持续
执行时间将被记录。 (例如 2015-10-13 00:00 + 5min < current?)
- 二次检测下一次心跳:上次执行时间+调度器
间隔<当前时间?如果是这样,DAG 将再次执行。
- .....
解决方案是将DAGstart_date设置为datetime.now() - schedule_interval
。
如果你想调试:
在 settings.py
中将 LOGGINGLEVEL 设置为 debug
将airflow.models.TaskInstance
的class方法is_queueable()
修改为
:
def is_queueable(self, flag_upstream_failed=False):
logging.debug('Checking whether task instance is queueable or not!')
if self.execution_date > datetime.now() - self.task.schedule_interval:
logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
return False
...
因为开始时间(2015-10-13 00:00) 小于现在时间,所以触发气流backfill。它会 运行 从 2015-10-13 00:00 气流调度程序检测到的每一秒(它的开始日期),但执行日期在 5 分钟(任务间隔时间)之间。
查看日志名称:
$tree airflow/logs/testing/
testing/
|-- Orders10
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders11
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders12
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders13
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders14
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
-- Start1
|-- 2015-10-13T00:00:00
|-- 2015-10-13T00:05:00
|-- 2015-10-13T00:10:00
-- 2015-10-13T00:15:00
查看日志创建时间:
$ll airflow/logs/testing/Start1
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:00:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:05:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:51 2015-10-13T00:10:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:52 2015-10-13T00:15:00
此外,您还可以在网络上查看任务实例 UI:
代码:
Python 版本 2.7.x 和气流版本 1.5.1
我的 dag 脚本是这样的
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
由此您可以看到我正在创建一个包含 6 个任务的 DAG,第一个任务 (Start1) 首先启动,然后所有其他五个任务启动
目前我在 DAG 启动之间设置了 5 分钟的时间延迟
第一种类型的所有六个任务都运行完美,但五分钟后 DAG 没有重新启动
1 个小时多了,DAG 还没有重新启动我真的不知道我错了。
如果有人能指出我哪里出了问题,那就太好了。我尝试使用 airflow testing clear
进行清除,然后对同样的事情进行清除 happen.It 运行 第一个实例然后就站着那里。
命令行显示的唯一内容是 Getting all instance for DAG testing
当我更改 schedule_interval 的位置时,它只是在没有任何计划间隔的情况下运行 parallel.That 在 5 分钟内完成了 300 个或更多任务实例。没有 5 分钟的计划间隔
代码 2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
对于代码2,我猜它每分钟运行的原因是:
开始时间是2015-10-1300:00
计划间隔为 5 分钟
调度程序的每个心跳(默认为 5 秒),您的 DAG 将被检查
- 首先检查:开始日期(未找到最后执行日期)+ 调度程序 间隔<当前时间?如果是,DAG 将被执行并持续 执行时间将被记录。 (例如 2015-10-13 00:00 + 5min < current?)
- 二次检测下一次心跳:上次执行时间+调度器 间隔<当前时间?如果是这样,DAG 将再次执行。
- .....
解决方案是将DAGstart_date设置为datetime.now() - schedule_interval
。
如果你想调试:
在 settings.py
中将 LOGGINGLEVEL 设置为 将
airflow.models.TaskInstance
的class方法is_queueable()
修改为
debug
:
def is_queueable(self, flag_upstream_failed=False):
logging.debug('Checking whether task instance is queueable or not!')
if self.execution_date > datetime.now() - self.task.schedule_interval:
logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
return False
...
因为开始时间(2015-10-13 00:00) 小于现在时间,所以触发气流backfill。它会 运行 从 2015-10-13 00:00 气流调度程序检测到的每一秒(它的开始日期),但执行日期在 5 分钟(任务间隔时间)之间。
查看日志名称:
$tree airflow/logs/testing/
testing/
|-- Orders10
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders11
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders12
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders13
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
|-- Orders14
| |-- 2015-10-13T00:00:00
| |-- 2015-10-13T00:05:00
| -- 2015-10-13T00:10:00
-- Start1
|-- 2015-10-13T00:00:00
|-- 2015-10-13T00:05:00
|-- 2015-10-13T00:10:00
-- 2015-10-13T00:15:00
查看日志创建时间:
$ll airflow/logs/testing/Start1
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:00:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:05:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:51 2015-10-13T00:10:00
-rw-rw-r-- 1 admin admin 4192 Nov 9 14:52 2015-10-13T00:15:00
此外,您还可以在网络上查看任务实例 UI: