Django Celery delay() 总是推送到默认 'celery' 队列
Django Celery delay() always pushing to default 'celery' queue
我用这个把我的头发扯掉了。
我的问题的症结在于,在 settings.py
中使用 Django CELERY_DEFAULT_QUEUE
设置不会强制我的任务进入我设置的特定队列。它总是转到我代理中的默认 celery
队列。
但是,如果我在 shared_task
装饰器中指定 queue=proj:dev
,它将进入正确的队列。它的行为符合预期。
我的设置如下:
- 我本地主机上的 Django 代码(用于测试等)。通过 Django 的 shell (
manage.py shell
) 执行任务 .delay()
- 配置为我的代理的远程 Redis 实例
- 2 个 celery worker 在远程机器上配置并等待来自 Redis 的消息(在 Google App Engine - 可能不相关)
注意:对于下面的代码片段,我已经模糊了项目名称并使用 proj
作为占位符。
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, shared_task
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY', force=True)
app.autodiscover_tasks()
@shared_task
def add(x, y):
return x + y
settings.py
...
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = 'redis://:{}@{}:6379/0'.format(
os.environ.get('REDIS_PASSWORD'),
os.environ.get('REDIS_HOST', 'alice-redis-vm'))
CELERY_DEFAULT_QUEUE = os.environ.get('CELERY_DEFAULT_QUEUE', 'proj:dev')
我的想法是,就目前而言,我希望为我的代码所在的不同环境设置不同的队列:dev、staging、prod。因此,在 Google App Engine 上,我定义了一个基于单个 App Engine 服务传递的环境变量。
步骤
因此,通过上述配置,我使用 ./manage.py shell
和 运行 add.delay(2, 2)
启动了 shell。我收到 AsyncResult
返回,但 Redis 监视器清楚地显示消息已发送到默认 celery
队列:
1497566026.117419 [0 155.93.144.189:58887] "LPUSH" "celery"
...
我错过了什么?
我不想给工作带来麻烦,但我觉得今天有一点确实奏效了。但是对于我的一生,我想不出我大脑的哪一部分让我失望了。
堆栈版本:
python
: 3.5.2
celery
: 4.0.2
redis
: 2.10.5
django
: 1.10.4
这个问题比我想象的要简单得多——文档不正确!!
Celery 文档要求我们使用 CELERY_DEFAULT_QUEUE
在 celery 对象上设置 task_default_queue
配置。
参考:http://docs.celeryproject.org/en/latest/userguide/configuration.html#new-lowercase-settings
我们目前应该使用 CELERY_TASK_DEFAULT_QUEUE
。这是所有其他设置名称的命名不一致。它是在 Github 上提出的 - https://github.com/celery/celery/issues/3772
解决方案总结
在配置模块中使用CELERY_DEFAULT_QUEUE
(使用config_from_object
)对队列没有影响。
改用CELERY_TASK_DEFAULT_QUEUE
。
如果你来到这里是因为你试图在 Celery 中使用 SQS 实现一个预定义的队列,并且发现 Celery 在 SQS 中创建了一个名为“celery”的新队列,不管你说什么,你已经到了结尾你的旅途朋友。
在将 broker_transport_options
传递给 Celery 之前,更改您的默认队列 and/or 指定您将明确使用的队列。在我的例子中,我只需要一个队列,所以执行以下操作:
celery.conf.task_default_queue = "<YOUR_PREDEFINED_QUEUE_NAME_IN_SQS">
我用这个把我的头发扯掉了。
我的问题的症结在于,在 settings.py
中使用 Django CELERY_DEFAULT_QUEUE
设置不会强制我的任务进入我设置的特定队列。它总是转到我代理中的默认 celery
队列。
但是,如果我在 shared_task
装饰器中指定 queue=proj:dev
,它将进入正确的队列。它的行为符合预期。
我的设置如下:
- 我本地主机上的 Django 代码(用于测试等)。通过 Django 的 shell (
manage.py shell
) 执行任务 - 配置为我的代理的远程 Redis 实例
- 2 个 celery worker 在远程机器上配置并等待来自 Redis 的消息(在 Google App Engine - 可能不相关)
.delay()
注意:对于下面的代码片段,我已经模糊了项目名称并使用 proj
作为占位符。
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, shared_task
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY', force=True)
app.autodiscover_tasks()
@shared_task
def add(x, y):
return x + y
settings.py
...
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = 'redis://:{}@{}:6379/0'.format(
os.environ.get('REDIS_PASSWORD'),
os.environ.get('REDIS_HOST', 'alice-redis-vm'))
CELERY_DEFAULT_QUEUE = os.environ.get('CELERY_DEFAULT_QUEUE', 'proj:dev')
我的想法是,就目前而言,我希望为我的代码所在的不同环境设置不同的队列:dev、staging、prod。因此,在 Google App Engine 上,我定义了一个基于单个 App Engine 服务传递的环境变量。
步骤
因此,通过上述配置,我使用 ./manage.py shell
和 运行 add.delay(2, 2)
启动了 shell。我收到 AsyncResult
返回,但 Redis 监视器清楚地显示消息已发送到默认 celery
队列:
1497566026.117419 [0 155.93.144.189:58887] "LPUSH" "celery"
...
我错过了什么?
我不想给工作带来麻烦,但我觉得今天有一点确实奏效了。但是对于我的一生,我想不出我大脑的哪一部分让我失望了。
堆栈版本:
python
: 3.5.2celery
: 4.0.2redis
: 2.10.5django
: 1.10.4
这个问题比我想象的要简单得多——文档不正确!!
Celery 文档要求我们使用 CELERY_DEFAULT_QUEUE
在 celery 对象上设置 task_default_queue
配置。
参考:http://docs.celeryproject.org/en/latest/userguide/configuration.html#new-lowercase-settings
我们目前应该使用 CELERY_TASK_DEFAULT_QUEUE
。这是所有其他设置名称的命名不一致。它是在 Github 上提出的 - https://github.com/celery/celery/issues/3772
解决方案总结
在配置模块中使用CELERY_DEFAULT_QUEUE
(使用config_from_object
)对队列没有影响。
改用CELERY_TASK_DEFAULT_QUEUE
。
如果你来到这里是因为你试图在 Celery 中使用 SQS 实现一个预定义的队列,并且发现 Celery 在 SQS 中创建了一个名为“celery”的新队列,不管你说什么,你已经到了结尾你的旅途朋友。
在将 broker_transport_options
传递给 Celery 之前,更改您的默认队列 and/or 指定您将明确使用的队列。在我的例子中,我只需要一个队列,所以执行以下操作:
celery.conf.task_default_queue = "<YOUR_PREDEFINED_QUEUE_NAME_IN_SQS">