如何覆盖芹菜任务的后端

How do I override the backend for celery tasks

我们使用 Redis 作为结果后端。然而,对于一项任务,我们想覆盖它以改用 RabbitMQ。

Task.backend 的文档说:

The result store backend to use for this task. Defaults to the CELERY_RESULT_BACKEND setting

所以我假设我们可以将 Task.backend 设置为与 CELERY_RESULT_BACKEND 接受的格式相同的字符串。

所以我试试这个:

celeryconfig.py

CELERY_RESULT_BACKEND = "redis://redis-host:7777"

tasks.py

@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
    ...

但是工人失败了:

[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
    self.run()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
    self.after_fork()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
    self.initializer(*self.initargs)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
    app=app)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
    store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'

文档不正确。 Task.backend 实际上是来自 celery.backends 的后端 class 的一个实例。在这种情况下,要覆盖任务 class 我必须这样做:

from celery.backends.amqp import AMQPBackend

@app.task(backend=AMQPBackend(app, url='amqp://guest@localhost/tasks-stg'))
def my_task(params):
    ...

然而,工作人员继续使用默认设置 class,并且似乎没有提供覆盖它的方法。

扩展并更新原来接受的答案,遇到类似问题的人可能对 List of Options available in Celery task decorator . See Task.backend 部分感兴趣:

The result store backend to use for this task. An instance of one of the backend classes in celery.backends. Defaults to app.backend

此外,AMQP result backend has been removed 自版本 5.0 起,您可以使用 celery.backends.rpc.RPCBackend .

例如:

from celery.backends.rpc import RPCBackend as CeleryRpcBackend
_rpc_backend = CeleryRpcBackend(app=your_celery_app, \
        exchange=RPC_REPLY_EXCHANGE_DEFAULT_NAME, \
        exchange_type=RPC_EXCHANGE_DEFAULT_TYPE )

@your_celery_app.task(backend=_rpc_backend, ... OTHER_ARGS ... )
def your_task_function(**kwargs):
    DO_SOMETHING ...