气流:优先考虑 Dags
Airflow: Prioritize the Dags
我正在从元数据 table 中获取类别,并使用 python 脚本为每个类别创建动态 dag。现在,我们有大约 15 个类别,因此每个类别都有自己的 dag。我的 Dag 文件有 3 个任务,并且是 运行ning 顺序。
使用LocalExecutor.All 15 个dags(dag-运行s) 并行触发。我们没有足够的资源(任务很重)来运行并行处理所有 15 个 dag。
有什么方法可以确定 dag-运行 的优先级? 5 个 dag 应该首先 运行,然后接下来的五个应该 运行 等等。工作应该 运行 基于可用资源,其他人应该 queue.This 应该是动态的。
有什么解决这个问题的最佳方法吗?请帮忙。
示例 dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'start_date': datetime(2019, 6, 03),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('test', catchup=False, default_args=default_args, schedule_interval='*/5 * * * *')
t1 = BashOperator(
task_id='print_start_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 50s',
retries=3,
dag=dag)
t3 = BashOperator(
task_id='print_end_date',
bash_command='date',
dag=dag)
t1 >> t2 >> t3
如果你们都运行同时在 LocalExecutor 上并且他们都运行同时
,则没有一个好的有效方法来做到这一点。
如果您要转而使用 CeleryExecutor 并拥有多台工作机器,那么您可以使用 Airflow Queues 的概念来创建一个 "priority" 队列,该队列为您指示为高的 DAG 提供服务优先级。
另一个选项是使用 Sub DAGs。 15 个 DAG 中的每一个都可以按照您想要的顺序构造为子 DAG 和 运行。这是一个可能看起来像的示例:
start ----> Sub Dag 1 --> Sub Dag 6 --> Sub Dag 11
|--> Sub Dag 2 --> Sub Dag 7 --> Sub Dag 12
|--> Sub Dag 3 --> Sub Dag 8 --> Sub Dag 13
|--> Sub Dag 4 --> Sub Dag 9 --> Sub Dag 14
|--> Sub Dag 5 --> Sub Dag 10 --> Sub Dag 15
我正在从元数据 table 中获取类别,并使用 python 脚本为每个类别创建动态 dag。现在,我们有大约 15 个类别,因此每个类别都有自己的 dag。我的 Dag 文件有 3 个任务,并且是 运行ning 顺序。
使用LocalExecutor.All 15 个dags(dag-运行s) 并行触发。我们没有足够的资源(任务很重)来运行并行处理所有 15 个 dag。
有什么方法可以确定 dag-运行 的优先级? 5 个 dag 应该首先 运行,然后接下来的五个应该 运行 等等。工作应该 运行 基于可用资源,其他人应该 queue.This 应该是动态的。
有什么解决这个问题的最佳方法吗?请帮忙。
示例 dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'start_date': datetime(2019, 6, 03),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('test', catchup=False, default_args=default_args, schedule_interval='*/5 * * * *')
t1 = BashOperator(
task_id='print_start_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 50s',
retries=3,
dag=dag)
t3 = BashOperator(
task_id='print_end_date',
bash_command='date',
dag=dag)
t1 >> t2 >> t3
如果你们都运行同时在 LocalExecutor 上并且他们都运行同时
,则没有一个好的有效方法来做到这一点。如果您要转而使用 CeleryExecutor 并拥有多台工作机器,那么您可以使用 Airflow Queues 的概念来创建一个 "priority" 队列,该队列为您指示为高的 DAG 提供服务优先级。
另一个选项是使用 Sub DAGs。 15 个 DAG 中的每一个都可以按照您想要的顺序构造为子 DAG 和 运行。这是一个可能看起来像的示例:
start ----> Sub Dag 1 --> Sub Dag 6 --> Sub Dag 11
|--> Sub Dag 2 --> Sub Dag 7 --> Sub Dag 12
|--> Sub Dag 3 --> Sub Dag 8 --> Sub Dag 13
|--> Sub Dag 4 --> Sub Dag 9 --> Sub Dag 14
|--> Sub Dag 5 --> Sub Dag 10 --> Sub Dag 15