如何找出气流中的延迟工作

How to find out the delayed jobs in airflow

我的DAG有的在等排,有的在排队。我怀疑这种延迟是有原因的,但不确定如何开始调试这个问题。大多数管道是 运行 个 Spark 作业。

有人能帮我指点一下去哪里看 1) 分析哪些 DAG 被延迟了(没有在预定时间开始)2) 我应该去哪里寻找答案如果资源足够。我对 Airflow 中的日程安排还很陌生。非常感谢。如果我能更好地描述问题,请告诉我。

根据您的 Airflow 版本和设置,您应该能够直接查询 Airflow 数据库以获取此信息。

如果您使用的是 Airflow 1.x,UI 的“数据分析”选项卡中应该有一个“Ad Hoc Query”执行器。这在 2.x 中被禁用,所以如果你是 运行 2.x 你需要使用 psql 或类似的东西直接连接到你的 Airflow DB(这不同于Google 到 AWS 到 Docker).

进入后,查看 this link 有关 DAG 运行时的一些查询。

如果您正在寻找利用 Airflows 更广泛功能的代码。

airflow.models 中有三个模块可以利用。

  1. 为了以编程方式检索您的 Airflow 远离的所有 DAG,我们导入 DagBag。来自文档“dagbag 是 dag 的集合,从文件夹树中解析出来并且具有高
  2. 我们利用 DagModel and the method get_current,来初始化我们包中存在的每个 dag_id
  3. 我们使用 DagModel 检查是否有任何 DAG 处于活动状态 属性 is_paused
  4. 我们使用 DagRun.find
  5. 检索最新的 DAG 运行
  6. 将单个 dag 运行 按从早到晚排序
  7. 在这里你可以只对 [0] 进行子集得到 1,但是,为了你的调试目的,我只是循环遍历它们
  8. DagRunreturns很多资料供我们使用。在我的循环中,我有输出 print(i, run.state, run.execution_date, run.start_date)。所以你可以看到引擎盖下发生了什么。

id state dag_id queued_at execution_date start_date end_date run_id data_interval_start data_interval_end last_scheduling_decision

  1. 我已经为任何排队的 Dags 注释掉了 if 检查,供您取消注释。此外,您可以根据需要对日期进行一些算术运算,以添加更多条件功能。
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import DagBag, DagModel, DagRun
from airflow.operators.python import PythonOperator


# make a function that returns if a DAG is set to active or paused

def check_dag_active():
    bag = DagBag()
    for dag_id in bag.dags:
        in_bag = DagModel.get_current(dag_id)
        if not in_bag.is_paused:
            latest = DagRun.find(dag_id=dag_id)
            latest.sort(key=lambda x: x.execution_date, reverse=True)
            for i, run in enumerate(latest):
                print(i, run.state, run.execution_date, run.start_date)
                # if run.state == 'queued':
                #     return [run.dag_id, run.execution_date, run.start_date]

with DAG(
  'stack_overflow_ans_3',
  tags = ['SO'],
  start_date = datetime(2022, 1, 1),
  schedule_interval = None,
  catchup = False,
  is_paused_upon_creation = False
) as dag:

  t1 = PythonOperator(
    task_id = 'task_that_will_fail',
    python_callable = check_dag_active
  )