Airflow:如何指定资源池的定量使用?

Airflow: how to specify quantitative usage of a resource pool?

我正在查看几个开源工作流调度程序,用于使用异构 RAM 的作业的 DAG。调度程序不仅应该调度少于最大线程数,还应该将所有并发任务的 RAM 总量保持在可用内存以下。

在这个中解释了

You can set how many of the resource is available in the config, and then how many of the resource the task consumes as a property on the task. This will then limit you to running n of that task at a time.

in config:

[resources]
api=1

in code for Task:

resources = {"api": 1}

对于 Airflow,我无法在其文档中找到相同的功能。最好的办法是 specify a number of available slots in a resource pool,并且还指定任务实例使用资源池中的单个槽。但是,似乎无法指定任务实例在池中使用多个槽。

问题: 专门针对Airflow,如何指定任务实例的量化资源使用量?

假设您正在使用 CeleryExecutor,那么从 airflow 版本 1.9.0 开始,您可以管理 Celery 的任务并发。这不完全是您一直在询问的内存管理,而是执行任务的并发工作线程数。

可调整的参数称为 CELERYD_CONCURRENCY 很好地解释了如何在 Airflow 中管理与芹菜相关的配置。

[编辑]

其实Pools也可以用来限制并发。 假设您想要限制资源匮乏 task_id,以便只有 2 个实例同时 运行。您唯一需要做的是:

  • create pool (in UI: Admin -> Pools) 为其分配名称,例如my_pool 并在字段 Slots 中定义任务的并发性(在本例中为 2

  • 在实例化将执行此 task_idOperator 时,传递定义的池名称 (pool=my_pool)