如何找出气流中的延迟工作
How to find out the delayed jobs in airflow
我的DAG有的在等排,有的在排队。我怀疑这种延迟是有原因的,但不确定如何开始调试这个问题。大多数管道是 运行 个 Spark 作业。
有人能帮我指点一下去哪里看 1) 分析哪些 DAG 被延迟了(没有在预定时间开始)2) 我应该去哪里寻找答案如果资源足够。我对 Airflow 中的日程安排还很陌生。非常感谢。如果我能更好地描述问题,请告诉我。
根据您的 Airflow 版本和设置,您应该能够直接查询 Airflow 数据库以获取此信息。
如果您使用的是 Airflow 1.x,UI 的“数据分析”选项卡中应该有一个“Ad Hoc Query”执行器。这在 2.x 中被禁用,所以如果你是 运行 2.x 你需要使用 psql
或类似的东西直接连接到你的 Airflow DB(这不同于Google 到 AWS 到 Docker).
进入后,查看 this link 有关 DAG 运行时的一些查询。
如果您正在寻找利用 Airflows 更广泛功能的代码。
airflow.models
中有三个模块可以利用。
- 为了以编程方式检索您的 Airflow 远离的所有 DAG,我们导入 DagBag。来自文档“dagbag 是 dag 的集合,从文件夹树中解析出来并且具有高”
- 我们利用 DagModel and the method get_current,来初始化我们包中存在的每个 dag_id
- 我们使用 DagModel 检查是否有任何 DAG 处于活动状态 属性 is_paused
- 我们使用 DagRun.find
检索最新的 DAG 运行
- 将单个 dag 运行 按从早到晚排序
- 在这里你可以只对 [0] 进行子集得到 1,但是,为了你的调试目的,我只是循环遍历它们
- DagRunreturns很多资料供我们使用。在我的循环中,我有输出
print(i, run.state, run.execution_date, run.start_date)
。所以你可以看到引擎盖下发生了什么。
id
state
dag_id
queued_at
execution_date
start_date
end_date
run_id
data_interval_start
data_interval_end
last_scheduling_decision
- 我已经为任何排队的 Dags 注释掉了 if 检查,供您取消注释。此外,您可以根据需要对日期进行一些算术运算,以添加更多条件功能。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import DagBag, DagModel, DagRun
from airflow.operators.python import PythonOperator
# make a function that returns if a DAG is set to active or paused
def check_dag_active():
bag = DagBag()
for dag_id in bag.dags:
in_bag = DagModel.get_current(dag_id)
if not in_bag.is_paused:
latest = DagRun.find(dag_id=dag_id)
latest.sort(key=lambda x: x.execution_date, reverse=True)
for i, run in enumerate(latest):
print(i, run.state, run.execution_date, run.start_date)
# if run.state == 'queued':
# return [run.dag_id, run.execution_date, run.start_date]
with DAG(
'stack_overflow_ans_3',
tags = ['SO'],
start_date = datetime(2022, 1, 1),
schedule_interval = None,
catchup = False,
is_paused_upon_creation = False
) as dag:
t1 = PythonOperator(
task_id = 'task_that_will_fail',
python_callable = check_dag_active
)
我的DAG有的在等排,有的在排队。我怀疑这种延迟是有原因的,但不确定如何开始调试这个问题。大多数管道是 运行 个 Spark 作业。
有人能帮我指点一下去哪里看 1) 分析哪些 DAG 被延迟了(没有在预定时间开始)2) 我应该去哪里寻找答案如果资源足够。我对 Airflow 中的日程安排还很陌生。非常感谢。如果我能更好地描述问题,请告诉我。
根据您的 Airflow 版本和设置,您应该能够直接查询 Airflow 数据库以获取此信息。
如果您使用的是 Airflow 1.x,UI 的“数据分析”选项卡中应该有一个“Ad Hoc Query”执行器。这在 2.x 中被禁用,所以如果你是 运行 2.x 你需要使用 psql
或类似的东西直接连接到你的 Airflow DB(这不同于Google 到 AWS 到 Docker).
进入后,查看 this link 有关 DAG 运行时的一些查询。
如果您正在寻找利用 Airflows 更广泛功能的代码。
airflow.models
中有三个模块可以利用。
- 为了以编程方式检索您的 Airflow 远离的所有 DAG,我们导入 DagBag。来自文档“dagbag 是 dag 的集合,从文件夹树中解析出来并且具有高”
- 我们利用 DagModel and the method get_current,来初始化我们包中存在的每个 dag_id
- 我们使用 DagModel 检查是否有任何 DAG 处于活动状态 属性 is_paused
- 我们使用 DagRun.find 检索最新的 DAG 运行
- 将单个 dag 运行 按从早到晚排序
- 在这里你可以只对 [0] 进行子集得到 1,但是,为了你的调试目的,我只是循环遍历它们
- DagRunreturns很多资料供我们使用。在我的循环中,我有输出
print(i, run.state, run.execution_date, run.start_date)
。所以你可以看到引擎盖下发生了什么。
id state dag_id queued_at execution_date start_date end_date run_id data_interval_start data_interval_end last_scheduling_decision
- 我已经为任何排队的 Dags 注释掉了 if 检查,供您取消注释。此外,您可以根据需要对日期进行一些算术运算,以添加更多条件功能。
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import DagBag, DagModel, DagRun
from airflow.operators.python import PythonOperator
# make a function that returns if a DAG is set to active or paused
def check_dag_active():
bag = DagBag()
for dag_id in bag.dags:
in_bag = DagModel.get_current(dag_id)
if not in_bag.is_paused:
latest = DagRun.find(dag_id=dag_id)
latest.sort(key=lambda x: x.execution_date, reverse=True)
for i, run in enumerate(latest):
print(i, run.state, run.execution_date, run.start_date)
# if run.state == 'queued':
# return [run.dag_id, run.execution_date, run.start_date]
with DAG(
'stack_overflow_ans_3',
tags = ['SO'],
start_date = datetime(2022, 1, 1),
schedule_interval = None,
catchup = False,
is_paused_upon_creation = False
) as dag:
t1 = PythonOperator(
task_id = 'task_that_will_fail',
python_callable = check_dag_active
)