如何在 Airflow 中设置不同 DAG 的优先级

How to set priority across different DAGs in Airflow

假设我们有两个 DAG,dag1 和 dag2,它们满足不同的业务需求。他们完全无关。但dag1越早完成越重要
为了简单起见,他们都只有一项任务,而且他们每天 运行。

在 dag1 落后计划 2 或 3 天的情况下,我想确保 dag1 运行s 并首先完成其 dag_runs,即 dag1 是最新的接下来 dag2 能够继续。

我试过 priority_weight 但它在不同的 dag 上不起作用。

我需要一种方法将来自两个不同 dag 的任务放在同一个队列中并实现 DAG 级别的优先级排序。

我找到了一个 ad-hoc 解决方案,我可以简单地将两个 dag 包裹在一个锁定层中。

我的意思是,我们在开始时添加了一个简单的任务来锁定数据库中的特定行,然后在 dag 的末尾,我们还添加了一个简单的任务来解锁锁定的行。
因此,当两个 dags 之一当前正在执行而另一个想要开始时,它只会被阻塞,因为它无法锁定两个 dags 已知的特定行。

下面对锁定层进行简单的说明
dag1: lock_operator, task1, unlock_operator.
dag2: lock_operator, task1, unlock_operator.

当然,如果lock_operator无法锁定该行,我们可以让lock_operator失败,并将retries_count设置为非常高,这样我们保证,它仍然会重试锁定直到它可以。

来自External Task Sensor的官方文档:

Waits for a different DAG or a task in a different DAG to complete for
a specific execution_date.

    :param external_dag_id: The dag_id that contains the task you want to
        wait for
    :type external_dag_id: str
    :param external_task_id: The task_id that contains the task you want to
        wait for. If ``None`` the sensor waits for the DAG
    :type external_task_id: str
    :param allowed_states: list of allowed states, default is ``['success']``
    :type allowed_states: list
    :param execution_delta: time difference with the previous execution to
        look at, the default is the same execution_date as the current task or DAG.
        For yesterday, use [positive!] datetime.timedelta(days=1). Either
        execution_delta or execution_date_fn can be passed to
        ExternalTaskSensor, but not both.
    :type execution_delta: datetime.timedelta
    :param execution_date_fn: function that receives the current execution date
        and returns the desired execution dates to query. Either execution_delta
        or execution_date_fn can be passed to ExternalTaskSensor, but not both.
    :type execution_date_fn: callable
    :param check_existence: Set to `True` to check if the external task exists (when
        external_task_id is not None) or check if the DAG to wait for exists (when
        external_task_id is None), and immediately cease waiting if the external task
        or DAG does not exist (default value: False).
    :type check_existence: bool

两个 DAG 都应将 depends_on_past Trigger Rule 设置为 True 以便较新的计划 DAG 运行仅在先前计划的运行成功完成时才会执行。

然后在 Dag 2(稍后执行的那个)的开头添加外部任务传感器。

或者,您可以创建自己的自定义传感器并通过 Airflow Plugins 使用它,以检查元数据数据库中 Dag Runs 的状态。

您还可以构建客户传感器,将 Airflow XCOMs or Airflow Variables to pass execution run times or any other Airflow Macro 用于 DAG 2 中的传感器。

你的问题有点含糊,因为你需要什么是一种依赖管理方法,例如 ExternalTask​​Sensor 可以解决问题,但是what you want是一个队列管理,优先让一个dag(或一组dag)特殊化,进入时获取硬件资源例如,FIFO 方式的队列。

对于优先级,您需要首先使用 Airflow 的一种队列架构(又名并行处理加上使用第三方应用程序进行队列管理),例如 RabbitMQ+CeleryRedis+Celery 然后创建不同的队列并将 A 组 dags 分配给 queue1,将 B 组 dags 分配给 queue2,稍后在设置中更改每个队列的资源规划。

https://www.rabbitmq.com/priority.html

祝你好运