不同池中的气流 priority_weight
Airflow priority_weight across different pools
我正在使用气流 2.2.0。我试图优先完成一个 运行 完成,而不是对同一个 dag 进行多次运行。
理想情况下,我希望 dag 的所有任务都完全完成,而不是 运行 并行执行相同的任务。
原则上,使用priority_weight
应该可行。但是一旦我为每个任务使用不同的池,这就不再有效了。
# import random
from airflow.models.pool import Pool
from airflow.operators.python_operator import PythonOperator
from utils.utils import generate_dag
from time import sleep
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def sleep_and_print(n):
for i in range(n):
LOGGER.info(f"Sleeping for {i} seconds")
sleep(1)
def task1(*args, **kwargs):
LOGGER.info("task1 started")
sleep_and_print(10)
LOGGER.info("task1 finished")
# if random.randint(0, 1):
# raise Exception("task1 failed")
def task2(*args, **kwargs):
LOGGER.info("task2 started")
sleep_and_print(2)
LOGGER.info("task2 finished")
def task3(*args, **kwargs):
LOGGER.info("task3 started")
sleep_and_print(10)
LOGGER.info("task3 finished")
def task4(*args, **kwargs):
LOGGER.info("task4 started")
sleep_and_print(15)
LOGGER.info("task4 finished")
def task5(*args, **kwargs):
LOGGER.info("task5 started")
sleep_and_print(10)
LOGGER.info("task5 finished")
def op_task(dag, task_id, task_func, pool, wait='absolute'):
return PythonOperator(
task_id=task_id,
python_callable=task_func,
weight_rule=wait,
pool=Pool.get_pool(pool),
dag=dag
)
dag = generate_dag('test_prio')
task_1 = op_task(dag, 'task_1', task1, wait='upstream', pool='test_prio')
# task_1 = op_task(dag, 'task_1', task1)
task_2 = op_task(dag, 'task_2', task2, wait='upstream', pool='test_prio2')
task_3 = op_task(dag, 'task_3', task3, wait='upstream', pool='test_prio3')
task_4 = op_task(dag, 'task_4', task4, wait='upstream', pool='test_prio4')
task_5 = op_task(dag, 'task_5', task5, wait='upstream', pool='test_prio5')
task_1 >> task_2 >> task_3
task_1 >> task_4
task_1 >> task_5
我知道 priority_weight
在每个池中的 pool
级别工作,但不是全局的。我找不到像文档中那样明确的内容,但是天文学家的 guide 中有示例和进一步的解释。
来自提到的文章:
Pools are meant to control parallelism for Task Instances. If instead you are looking to place limits on the number of concurrent DagRuns for a single DAG or all DAGs, check out the max_active_runs and core.max_active_runs_per_dag parameters respectively
因此,这里有一个小示例来测试 max_active_runs
如何适用于同一 DAG 的多个 DagRun:
文档:
:param max_active_runs: maximum number of active DAG runs, beyond this
number of DAG runs in a running state, the scheduler won't create
new active DAG runs
示例:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
args = {
"owner": "airflow",
}
with DAG(
dag_id="example_max_active_runs",
default_args=args,
schedule_interval="@once",
start_date=datetime(2021, 11, 3),
max_active_runs=1,
catchup=False,
tags=["example", ],
) as dag:
main_task = BashOperator(
task_id="main_task",
bash_command='echo "waiting.." && sleep 30',
)
end = DummyOperator(
task_id="end",
)
main_task >> end
从 UI 触发 DAG 3 次,表明自 max_active_runs=1
以来,只有一个 运行 正在执行,其他的都在排队:
第一个 DagRun:
第二个 DagRun:
此功能仍在 Airflow 中进行,github 回购中已经提出了一个问题:https://github.com/apache/airflow/issues/13975#issuecomment-806538498[][1]
我正在使用气流 2.2.0。我试图优先完成一个 运行 完成,而不是对同一个 dag 进行多次运行。 理想情况下,我希望 dag 的所有任务都完全完成,而不是 运行 并行执行相同的任务。
原则上,使用priority_weight
应该可行。但是一旦我为每个任务使用不同的池,这就不再有效了。
# import random
from airflow.models.pool import Pool
from airflow.operators.python_operator import PythonOperator
from utils.utils import generate_dag
from time import sleep
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def sleep_and_print(n):
for i in range(n):
LOGGER.info(f"Sleeping for {i} seconds")
sleep(1)
def task1(*args, **kwargs):
LOGGER.info("task1 started")
sleep_and_print(10)
LOGGER.info("task1 finished")
# if random.randint(0, 1):
# raise Exception("task1 failed")
def task2(*args, **kwargs):
LOGGER.info("task2 started")
sleep_and_print(2)
LOGGER.info("task2 finished")
def task3(*args, **kwargs):
LOGGER.info("task3 started")
sleep_and_print(10)
LOGGER.info("task3 finished")
def task4(*args, **kwargs):
LOGGER.info("task4 started")
sleep_and_print(15)
LOGGER.info("task4 finished")
def task5(*args, **kwargs):
LOGGER.info("task5 started")
sleep_and_print(10)
LOGGER.info("task5 finished")
def op_task(dag, task_id, task_func, pool, wait='absolute'):
return PythonOperator(
task_id=task_id,
python_callable=task_func,
weight_rule=wait,
pool=Pool.get_pool(pool),
dag=dag
)
dag = generate_dag('test_prio')
task_1 = op_task(dag, 'task_1', task1, wait='upstream', pool='test_prio')
# task_1 = op_task(dag, 'task_1', task1)
task_2 = op_task(dag, 'task_2', task2, wait='upstream', pool='test_prio2')
task_3 = op_task(dag, 'task_3', task3, wait='upstream', pool='test_prio3')
task_4 = op_task(dag, 'task_4', task4, wait='upstream', pool='test_prio4')
task_5 = op_task(dag, 'task_5', task5, wait='upstream', pool='test_prio5')
task_1 >> task_2 >> task_3
task_1 >> task_4
task_1 >> task_5
我知道 priority_weight
在每个池中的 pool
级别工作,但不是全局的。我找不到像文档中那样明确的内容,但是天文学家的 guide 中有示例和进一步的解释。
来自提到的文章:
Pools are meant to control parallelism for Task Instances. If instead you are looking to place limits on the number of concurrent DagRuns for a single DAG or all DAGs, check out the max_active_runs and core.max_active_runs_per_dag parameters respectively
因此,这里有一个小示例来测试 max_active_runs
如何适用于同一 DAG 的多个 DagRun:
文档:
:param max_active_runs: maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs
示例:
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
args = {
"owner": "airflow",
}
with DAG(
dag_id="example_max_active_runs",
default_args=args,
schedule_interval="@once",
start_date=datetime(2021, 11, 3),
max_active_runs=1,
catchup=False,
tags=["example", ],
) as dag:
main_task = BashOperator(
task_id="main_task",
bash_command='echo "waiting.." && sleep 30',
)
end = DummyOperator(
task_id="end",
)
main_task >> end
从 UI 触发 DAG 3 次,表明自 max_active_runs=1
以来,只有一个 运行 正在执行,其他的都在排队:
第一个 DagRun:
第二个 DagRun:
此功能仍在 Airflow 中进行,github 回购中已经提出了一个问题:https://github.com/apache/airflow/issues/13975#issuecomment-806538498[][1]