如何按名称限制 运行 Celery 任务的最大数量

How to limit the maximum number of running Celery tasks by name

如何限制可以同时 运行 的特定 Celery 任务的实例数?

我有一个处理大文件的任务。我 运行 遇到一个问题,用户可能会启动多个任务,导致服务器 运行 超出 CPU 和内存,因为它试图一次处理太多文件。我想确保在任何给定时间只有这种类型任务的 N 个实例 运行,并且其他任务将在调度程序中排队,直到其他任务完成。

我看到任务装饰器中有一个 rate_limit 选项,但我认为这不是我想要的。如果我对文档的理解正确,这只会限制任务的启动速度,但不会限制任务的总数 运行ning,所以这会使我的服务器崩溃得更慢。 .但它仍然会崩溃。

我不确定你可以在 Celery 中做到这一点,你可以做的是检查当前有多少任务 运行 当请求到达时,如果它超过最大值 return 一个错误或添加一个机制,定期检查是否有任务的开放槽并运行它(如果你添加这样的机制,你不需要仔细检查,只需在每个请求时将它添加到它的队列中。

为了查看运行任务,可以使用inspect命令。

简而言之:

app = Celery(...)
i = app.control.inspect()
i.active()

您可以做的是将这些任务推送到特定队列,并让 X 数量的工作人员处理它们。在一个有 100 个项目的队列中有两个工作人员将确保只会同时处理两个任务。

您必须设置额外的队列并为其设置所需的并发级别。来自 Routing Tasks:

# Old config style    
CELERY_ROUTES = {
                'app.tasks.limited_task': {'queue': 'limited_queue'}
            } 

from kombu import Exchange, Queue
celery.conf.task_queues = (
        Queue('default', default_exchange, routing_key='default'),
        Queue('limited_queue', default_exchange, routing_key='limited_queue')
    ) 

并启动额外的 worker,仅服务 limited_queue:

$ celery -A celery_app worker -Q limited_queue --loglevel=info -c 1 -n limited_queue

然后你可以使用 Flower 或 inspect 命令

顺利检查所有内容 运行
$ celery -A celery_app worker inspect --help