不同池中的气流 priority_weight

Airflow priority_weight across different pools

我正在使用气流 2.2.0。我试图优先完成一个 运行 完成,而不是对同一个 dag 进行多次运行。 理想情况下,我希望 dag 的所有任务都完全完成,而不是 运行 并行执行相同的任务。

原则上,使用priority_weight应该可行。但是一旦我为每个任务使用不同的池,这就不再有效了。

# import random
from airflow.models.pool import Pool
from airflow.operators.python_operator import PythonOperator

from utils.utils import generate_dag
from time import sleep


LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


def sleep_and_print(n):
    for i in range(n):
        LOGGER.info(f"Sleeping for {i} seconds")
        sleep(1)


def task1(*args, **kwargs):
    LOGGER.info("task1 started")
    sleep_and_print(10)
    LOGGER.info("task1 finished")
    # if random.randint(0, 1):
        # raise Exception("task1 failed")


def task2(*args, **kwargs):
    LOGGER.info("task2 started")
    sleep_and_print(2)
    LOGGER.info("task2 finished")


def task3(*args, **kwargs):
    LOGGER.info("task3 started")
    sleep_and_print(10)
    LOGGER.info("task3 finished")


def task4(*args, **kwargs):
    LOGGER.info("task4 started")
    sleep_and_print(15)
    LOGGER.info("task4 finished")


def task5(*args, **kwargs):
    LOGGER.info("task5 started")
    sleep_and_print(10)
    LOGGER.info("task5 finished")


def op_task(dag, task_id, task_func, pool, wait='absolute'):
    return PythonOperator(
        task_id=task_id,
        python_callable=task_func,
        weight_rule=wait,
        pool=Pool.get_pool(pool),
        dag=dag
    )


dag = generate_dag('test_prio')

task_1 = op_task(dag, 'task_1', task1, wait='upstream', pool='test_prio')
# task_1 = op_task(dag, 'task_1', task1)
task_2 = op_task(dag, 'task_2', task2, wait='upstream', pool='test_prio2')
task_3 = op_task(dag, 'task_3', task3, wait='upstream', pool='test_prio3')
task_4 = op_task(dag, 'task_4', task4, wait='upstream', pool='test_prio4') 
task_5 = op_task(dag, 'task_5', task5, wait='upstream', pool='test_prio5')

task_1 >> task_2 >> task_3
task_1 >> task_4
task_1 >> task_5

我知道 priority_weight 在每个池中的 pool 级别工作,但不是全局的。我找不到像文档中那样明确的内容,但是天文学家的 guide 中有示例和进一步的解释。

来自提到的文章:

Pools are meant to control parallelism for Task Instances. If instead you are looking to place limits on the number of concurrent DagRuns for a single DAG or all DAGs, check out the max_active_runs and core.max_active_runs_per_dag parameters respectively

因此,这里有一个小示例来测试 max_active_runs 如何适用于同一 DAG 的多个 DagRun:

文档:

:param max_active_runs: maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs

示例:

from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator

args = {
    "owner": "airflow",
}

with DAG(
    dag_id="example_max_active_runs",
    default_args=args,
    schedule_interval="@once",
    start_date=datetime(2021, 11, 3),
    max_active_runs=1,
    catchup=False,
    tags=["example", ],
) as dag:

    main_task = BashOperator(
        task_id="main_task",
        bash_command='echo "waiting.." && sleep 30',
    )

    end = DummyOperator(
        task_id="end",
    )

    main_task >> end

从 UI 触发 DAG 3 次,表明自 max_active_runs=1 以来,只有一个 运行 正在执行,其他的都在排队:

第一个 DagRun:

第二个 DagRun:

此功能仍在 Airflow 中进行,github 回购中已经提出了一个问题:https://github.com/apache/airflow/issues/13975#issuecomment-806538498[][1]