如何覆盖芹菜任务的后端
How do I override the backend for celery tasks
我们使用 Redis 作为结果后端。然而,对于一项任务,我们想覆盖它以改用 RabbitMQ。
Task.backend 的文档说:
The result store backend to use for this task. Defaults to the
CELERY_RESULT_BACKEND setting
所以我假设我们可以将 Task.backend
设置为与 CELERY_RESULT_BACKEND
接受的格式相同的字符串。
所以我试试这个:
celeryconfig.py
CELERY_RESULT_BACKEND = "redis://redis-host:7777"
tasks.py
@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
...
但是工人失败了:
[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
self.run()
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
self.after_fork()
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
self.initializer(*self.initargs)
File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
app=app)
File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'
文档不正确。 Task.backend
实际上是来自 celery.backends
的后端 class 的一个实例。在这种情况下,要覆盖任务 class 我必须这样做:
from celery.backends.amqp import AMQPBackend
@app.task(backend=AMQPBackend(app, url='amqp://guest@localhost/tasks-stg'))
def my_task(params):
...
然而,工作人员继续使用默认设置 class,并且似乎没有提供覆盖它的方法。
扩展并更新原来接受的答案,遇到类似问题的人可能对 List of Options available in Celery task decorator . See Task.backend 部分感兴趣:
The result store backend to use for this task. An instance of one of the backend classes in celery.backends
. Defaults to app.backend
此外,AMQP result backend has been removed 自版本 5.0 起,您可以使用 celery.backends.rpc.RPCBackend
.
例如:
from celery.backends.rpc import RPCBackend as CeleryRpcBackend
_rpc_backend = CeleryRpcBackend(app=your_celery_app, \
exchange=RPC_REPLY_EXCHANGE_DEFAULT_NAME, \
exchange_type=RPC_EXCHANGE_DEFAULT_TYPE )
@your_celery_app.task(backend=_rpc_backend, ... OTHER_ARGS ... )
def your_task_function(**kwargs):
DO_SOMETHING ...
我们使用 Redis 作为结果后端。然而,对于一项任务,我们想覆盖它以改用 RabbitMQ。
Task.backend 的文档说:
The result store backend to use for this task. Defaults to the CELERY_RESULT_BACKEND setting
所以我假设我们可以将 Task.backend
设置为与 CELERY_RESULT_BACKEND
接受的格式相同的字符串。
所以我试试这个:
celeryconfig.py
CELERY_RESULT_BACKEND = "redis://redis-host:7777"
tasks.py
@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
...
但是工人失败了:
[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
self.run()
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
self.after_fork()
File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
self.initializer(*self.initargs)
File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
app=app)
File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'
文档不正确。 Task.backend
实际上是来自 celery.backends
的后端 class 的一个实例。在这种情况下,要覆盖任务 class 我必须这样做:
from celery.backends.amqp import AMQPBackend
@app.task(backend=AMQPBackend(app, url='amqp://guest@localhost/tasks-stg'))
def my_task(params):
...
然而,工作人员继续使用默认设置 class,并且似乎没有提供覆盖它的方法。
扩展并更新原来接受的答案,遇到类似问题的人可能对 List of Options available in Celery task decorator . See Task.backend 部分感兴趣:
The result store backend to use for this task. An instance of one of the backend classes in
celery.backends
. Defaults toapp.backend
此外,AMQP result backend has been removed 自版本 5.0 起,您可以使用 celery.backends.rpc.RPCBackend
.
例如:
from celery.backends.rpc import RPCBackend as CeleryRpcBackend
_rpc_backend = CeleryRpcBackend(app=your_celery_app, \
exchange=RPC_REPLY_EXCHANGE_DEFAULT_NAME, \
exchange_type=RPC_EXCHANGE_DEFAULT_TYPE )
@your_celery_app.task(backend=_rpc_backend, ... OTHER_ARGS ... )
def your_task_function(**kwargs):
DO_SOMETHING ...