在 运行 celery 应用程序中更改 concurrency/manage 队列
Change concurrency/manage queue in running celery app
我正在使用 celery 5 在快速 API 应用程序中管理一些外部任务。
我用一名工人和 8 个并发工作开始芹菜:
celery worker --app=app.worker.celery --concurrency=8 --loglevel=info --logfile=logs/celery.log
我希望能够从快速 API 应用更改并发。我不知道这是否是最好的方法,或者是否可能。
我还没有找到改变并发性的方法所以我尝试添加新的工作人员,使用
from celery import current_app as app
cmd = ["--app=app.worker.celery", "--concurrency=8", "--loglevel=info", "--logfile=logs/celery.log", "--without-gossip" , "--detach", "-E"]
app.worker_main(cmd)
但是即使通过 --detach
这也不起作用,它会阻止请求。
有another/better方法吗?
编辑:
在查看了 flower 1.0.1 是如何做到这一点之后,我能够追踪到正确的 API。
已解决:
from celery import current_app as app
response = app.control.pool_grow(
n=4, reply=True, destination=[worker_name])
您可以通过继承 celery.worker.autoscale.AutoScaler
并设置 worker_autoscaler
来编写自己的自动缩放器。例如,您的自动调节器可以监控数据库记录,并在记录发生变化时进行扩展。
Flower allows to grow and shrink the pool (here花实现)
from celery import current_app as app
n = 10 # increase the pool size
worker_name = "my_worker"
response = app.control.pool_grow(
n=n, reply=True, destination=[worker_name])
唯一的问题是它没有在更改后获取池大小的选项
我正在使用 celery 5 在快速 API 应用程序中管理一些外部任务。 我用一名工人和 8 个并发工作开始芹菜:
celery worker --app=app.worker.celery --concurrency=8 --loglevel=info --logfile=logs/celery.log
我希望能够从快速 API 应用更改并发。我不知道这是否是最好的方法,或者是否可能。 我还没有找到改变并发性的方法所以我尝试添加新的工作人员,使用
from celery import current_app as app
cmd = ["--app=app.worker.celery", "--concurrency=8", "--loglevel=info", "--logfile=logs/celery.log", "--without-gossip" , "--detach", "-E"]
app.worker_main(cmd)
但是即使通过 --detach
这也不起作用,它会阻止请求。
有another/better方法吗?
编辑:
在查看了 flower 1.0.1 是如何做到这一点之后,我能够追踪到正确的 API。 已解决:
from celery import current_app as app
response = app.control.pool_grow(
n=4, reply=True, destination=[worker_name])
您可以通过继承 celery.worker.autoscale.AutoScaler
并设置 worker_autoscaler
来编写自己的自动缩放器。例如,您的自动调节器可以监控数据库记录,并在记录发生变化时进行扩展。
Flower allows to grow and shrink the pool (here花实现)
from celery import current_app as app
n = 10 # increase the pool size
worker_name = "my_worker"
response = app.control.pool_grow(
n=n, reply=True, destination=[worker_name])
唯一的问题是它没有在更改后获取池大小的选项