芹菜:@shared_task 和非标准 BROKER_URL

Celery: @shared_task and non-standard BROKER_URL

我有一个 Celery 3.1.19 安装程序,它使用 BROKER_URL 包括一个虚拟主机。

# in settings.py
BROKER_URL = 'amqp://guest:guest@localhost:5672/yard'

Celery 正常启动,加载任务,我在@app.task 装饰器中定义的任务工作正常。我假设我在这端的 rabbitmq 和 celery 配置是正确的。

我用@shared_tasks定义并用app.autodiscover_tasks加载的任务在启动时仍然正确加载。但是,如果我调用任务,消息最终会出现在(仍然存在的)amqp://guest:guest@localhost:5672/ 虚拟主机中。

问题:我在这里缺少什么?共享任务从哪里获取它们的实际配置。

还有更多细节:

# celery_app.py

from celery import Celery

celery_app = Celery('celery_app')
celery_app.config_from_object('settings')

celery_app.autodiscover_tasks(['connectors'])

@celery_app.task
def i_do_work():
    print 'this works'

并且在 connectors/tasks.py 中(在同一文件夹中有一个 __init__.py):

# in connectors/tasks.py

from celery import shared_task

@shared_task
def I_do_not_work():
    print 'bummer'

shared 任务也同样被 Celery 实例接收。它只是缺少向右发送消息的上下文 BROKER_URL.

顺便说一句。为什么 shared_tasks 如此纯粹地记录下来。他们依赖于某些 Django 上下文吗?我没有使用 Django。

或者我的设置中需要额外的参数吗?

非常感谢。

应用程序启动时尚未导入 celery_app。在我的项目中,我在与 celery_app 定义相同的模块级别向 __init__.py 添加了以下代码。

from __future__ import absolute_import

try:
    from .celery_app import celery_app
except ImportError:
    # just in case someone develops application without 
    # celery running    
    pass

Celery 似乎附带了一个完美运行的默认应用程序,这让我感到困惑。在这种情况下,带有 NotImplementedError 的更像接口的结构可能会更有帮助。尽管如此,芹菜还是很棒的。