在 运行 celery 应用程序中更改 concurrency/manage 队列

Change concurrency/manage queue in running celery app

我正在使用 celery 5 在快速 API 应用程序中管理一些外部任务。 我用一名工人和 8 个并发工作开始芹菜:

celery worker --app=app.worker.celery --concurrency=8 --loglevel=info --logfile=logs/celery.log

我希望能够从快速 API 应用更改并发。我不知道这是否是最好的方法,或者是否可能。 我还没有找到改变并发性的方法所以我尝试添加新的工作人员,使用

from celery import current_app as app
cmd = ["--app=app.worker.celery", "--concurrency=8", "--loglevel=info", "--logfile=logs/celery.log", "--without-gossip" , "--detach", "-E"]
app.worker_main(cmd)

但是即使通过 --detach 这也不起作用,它会阻止请求。

有another/better方法吗?

编辑:

在查看了 flower 1.0.1 是如何做到这一点之后,我能够追踪到正确的 API。 已解决:

from celery import current_app as app
response = app.control.pool_grow(
            n=4, reply=True, destination=[worker_name])

您可以通过继承 celery.worker.autoscale.AutoScaler 并设置 worker_autoscaler 来编写自己的自动缩放器。例如,您的自动调节器可以监控数据库记录,并在记录发生变化时进行扩展。

Flower allows to grow and shrink the pool (here花实现)

from celery import current_app as app
n = 10 # increase the pool size
worker_name = "my_worker"
response = app.control.pool_grow(
            n=n, reply=True, destination=[worker_name])

唯一的问题是它没有在更改后获取池大小的选项