apache 气流调度器不调度作业
apache airflow scheduler not scheduling jobs
我正在与 apache airflow 1.8.0
合作。
这是我backfill
作业时的输出。
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,864] {models.py:1120} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 3 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'done': 0, 'upstream_failed': Decimal('0'), 'failed': Decimal('0')}, upstream_task_ids=['runme_0', 'runme_1', 'runme_2']
当我尝试安排任何 DAG
时它抛出错误。
Traceback (most recent call last):
File "/anaconda3/bin/airflow", line 28, in <module>
args.func(args)
File "/anaconda3/lib/python3.5/site-packages/airflow/bin/cli.py", line 167, in backfill
pool=args.pool)
File "/anaconda3/lib/python3.5/site-packages/airflow/models.py", line 3330, in run
job.run()
File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 200, in run
self._execute()
File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 2021, in _execute
raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------
这里是关于任务的输出。
BackfillJob is deadlocked. These tasks have succeeded:
set()
These tasks have started:
{}
These tasks have failed:
set()
These tasks are skipped:
set()
These tasks are deadlocked:
{<TaskInstance: example_bash_operator.runme_0 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:44:00 [scheduled]>}
测试 python 2.7 和 python 3.5
使用了SequentialExecutor和LocalExecutor
PS。如果我在当前时间回填DAG,它执行一次,然后为所有计划任务抛出上述错误。
您的气流实例处于死锁状态。失败的任务不允许未来 运行s 的任务。
Airflow 在每个 dag 运行 中将每个任务作为一个新进程启动,当任务失败并且未处理时出现死锁情况
要解决这种情况,您可以执行以下操作之一:
- 使用
airflow clear <<dag_id>>
这将解决死锁并允许 DAG/task 的未来 运行
- 如果以上没有解决问题,您需要
use airflow resetdb
这将清除气流数据库,从而解决问题
将来,
- 尝试并使用
execution_timeout=timedelta(minutes=2)
设置一些超时,以便您可以明确控制运算符
- 此外,请提供一个
on_failure_callback=handle_failure
,它会在操作员失败时干净地存在
希望这对您有所帮助,
干杯!
我正在与 apache airflow 1.8.0
合作。
这是我backfill
作业时的输出。
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,857] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,858] {models.py:1126} INFO - Dependencies all met for <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>
[2017-04-13 09:42:55,864] {models.py:1120} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 3 non-success(es). upstream_tasks_state={'skipped': Decimal('0'), 'successes': Decimal('0'), 'done': 0, 'upstream_failed': Decimal('0'), 'failed': Decimal('0')}, upstream_task_ids=['runme_0', 'runme_1', 'runme_2']
当我尝试安排任何 DAG
时它抛出错误。
Traceback (most recent call last):
File "/anaconda3/bin/airflow", line 28, in <module>
args.func(args)
File "/anaconda3/lib/python3.5/site-packages/airflow/bin/cli.py", line 167, in backfill
pool=args.pool)
File "/anaconda3/lib/python3.5/site-packages/airflow/models.py", line 3330, in run
job.run()
File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 200, in run
self._execute()
File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 2021, in _execute
raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------
这里是关于任务的输出。
BackfillJob is deadlocked. These tasks have succeeded:
set()
These tasks have started:
{}
These tasks have failed:
set()
These tasks are skipped:
set()
These tasks are deadlocked:
{<TaskInstance: example_bash_operator.runme_0 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:46:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_0 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_1 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:45:00 [scheduled]>, <TaskInstance: example_bash_operator.runme_2 2017-04-13 13:43:00 [scheduled]>, <TaskInstance: example_bash_operator.also_run_this 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_after_loop 2017-04-13 13:44:00 [scheduled]>, <TaskInstance: example_bash_operator.run_this_last 2017-04-13 13:44:00 [scheduled]>}
测试 python 2.7 和 python 3.5
使用了SequentialExecutor和LocalExecutor
PS。如果我在当前时间回填DAG,它执行一次,然后为所有计划任务抛出上述错误。
您的气流实例处于死锁状态。失败的任务不允许未来 运行s 的任务。
Airflow 在每个 dag 运行 中将每个任务作为一个新进程启动,当任务失败并且未处理时出现死锁情况
要解决这种情况,您可以执行以下操作之一:
- 使用
airflow clear <<dag_id>>
这将解决死锁并允许 DAG/task 的未来 运行
- 如果以上没有解决问题,您需要
use airflow resetdb
这将清除气流数据库,从而解决问题
将来,
- 尝试并使用
execution_timeout=timedelta(minutes=2)
设置一些超时,以便您可以明确控制运算符 - 此外,请提供一个
on_failure_callback=handle_failure
,它会在操作员失败时干净地存在
希望这对您有所帮助,
干杯!