如何控制 Airflow 安装的并行度或并发度?
How to control the parallelism or concurrency of an Airflow installation?
在我的一些 Apache Airflow 安装中,计划 运行 的 DAG 或任务不会 运行 即使调度程序似乎没有完全加载。如何增加可以同时 运行 的 DAG 或任务的数量?
同样,如果我的安装处于高负载状态,并且我想限制我的 Airflow worker 拉取排队任务的速度(例如减少资源消耗),我可以调整什么来降低平均负载?
这是自 Airflow v1.10.2 以来可用的配置选项的扩展列表。有些可以在 per-DAG 或 per-operator 的基础上设置,但在未指定时也可能回退到 setup-wide 默认值。
可以在per-DAG基础上指定的选项:
concurrency
:允许 运行 同时跨越所设置的 DAG 的所有活动 运行 的任务实例数。如果未设置则默认为 core.dag_concurrency
max_active_runs
:此 DAG 的最大活动 运行 数。一旦达到此限制,调度程序将不会创建新的活动 DAG 运行。如果未设置则默认为 core.max_active_runs_per_dag
示例:
# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)
# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)
可以在per-operator基础上指定的选项:
pool
:在其中执行任务的池。Pools 可用于限制 仅任务的一个子集 的并行度
task_concurrency
:同一任务跨多个DAG的并发限制运行s
示例:
t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)
在整个 Airflow 设置中指定的选项:
core.parallelism
:整个 Airflow 安装中的最大任务数 运行ning
core.dag_concurrency
:每个 DAG 可以 运行ning 的最大任务数(跨多个 DAG 运行s)
core.non_pooled_task_slot_count
:分配给不在池中的任务的任务槽数运行
core.max_active_runs_per_dag
:每个 DAG 的活动 DAG 的最大数量 运行s
scheduler.max_threads
:调度程序进程应该使用多少个线程来调度 DAG
celery.worker_concurrency
:worker 一次处理的最大任务实例数如果使用 CeleryExecutor
celery.sync_parallelism
:CeleryExecutor 用于同步任务状态的进程数
检查使用 core.executor 的气流配置。
SequentialExecutor 会顺序执行,所以你可以选择Local Executor 或者Clery Executor 来并行执行任务。
之后,您可以使用@hexacyanide
提到的其他选项
三个主要并发控制变量的说明:
从 airflow 版本 2.2 开始,task_concurrency
参数已被 max_active_tis_per_dag
弃用。
https://airflow.apache.org/docs/stable/faq.html#how-can-my-airflow-dag-run-faster
在我的一些 Apache Airflow 安装中,计划 运行 的 DAG 或任务不会 运行 即使调度程序似乎没有完全加载。如何增加可以同时 运行 的 DAG 或任务的数量?
同样,如果我的安装处于高负载状态,并且我想限制我的 Airflow worker 拉取排队任务的速度(例如减少资源消耗),我可以调整什么来降低平均负载?
这是自 Airflow v1.10.2 以来可用的配置选项的扩展列表。有些可以在 per-DAG 或 per-operator 的基础上设置,但在未指定时也可能回退到 setup-wide 默认值。
可以在per-DAG基础上指定的选项:
concurrency
:允许 运行 同时跨越所设置的 DAG 的所有活动 运行 的任务实例数。如果未设置则默认为core.dag_concurrency
max_active_runs
:此 DAG 的最大活动 运行 数。一旦达到此限制,调度程序将不会创建新的活动 DAG 运行。如果未设置则默认为core.max_active_runs_per_dag
示例:
# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)
# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)
可以在per-operator基础上指定的选项:
pool
:在其中执行任务的池。Pools 可用于限制 仅任务的一个子集 的并行度task_concurrency
:同一任务跨多个DAG的并发限制运行s
示例:
t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)
在整个 Airflow 设置中指定的选项:
core.parallelism
:整个 Airflow 安装中的最大任务数 运行ningcore.dag_concurrency
:每个 DAG 可以 运行ning 的最大任务数(跨多个 DAG 运行s)core.non_pooled_task_slot_count
:分配给不在池中的任务的任务槽数运行core.max_active_runs_per_dag
:每个 DAG 的活动 DAG 的最大数量 运行s
scheduler.max_threads
:调度程序进程应该使用多少个线程来调度 DAGcelery.worker_concurrency
:worker 一次处理的最大任务实例数如果使用 CeleryExecutorcelery.sync_parallelism
:CeleryExecutor 用于同步任务状态的进程数
检查使用 core.executor 的气流配置。 SequentialExecutor 会顺序执行,所以你可以选择Local Executor 或者Clery Executor 来并行执行任务。 之后,您可以使用@hexacyanide
提到的其他选项三个主要并发控制变量的说明:
从 airflow 版本 2.2 开始,task_concurrency
参数已被 max_active_tis_per_dag
弃用。
https://airflow.apache.org/docs/stable/faq.html#how-can-my-airflow-dag-run-faster