Celery/RabbitMQ 未确认消息阻塞队列?

Celery/RabbitMQ unacked messages blocking queue?

我已经调用了一个使用 urllib2 远程获取一些信息的任务几千次。这些任务是用随机 eta 安排的(一周内),因此它们不会同时访问服务器。有时我会收到 404,有时不会。我正在处理错误以防万一。

在 RabbitMQ 控制台中,我可以看到 16 条未确认的消息:

我停止了芹菜,清除了队列并重新启动了它。 16条未确认的消息还在。

我还有其他任务进入同一个队列,其中 none 也已执行。清除后,我尝试提交另一个任务,它的状态仍然是 ready:

有什么办法可以找出消息未被确认的原因吗?

版本:

celery==3.1.4
{rabbit,"RabbitMQ","3.5.3"}

celeryapp.py

CELERYBEAT_SCHEDULE = {
    'social_grabber': {
        'task': '<django app>.tasks.task_social_grabber',
        'schedule': crontab(hour=5, minute=0, day_of_week='sunday'),
    },
}

tasks.py

@app.task
def task_social_grabber():
    for user in users:
        eta = randint(0, 60 * 60 * 24 * 7) #week in seconds
        task_social_grabber_single.apply_async((user), countdown=eta)

没有为此任务定义路由,因此它进入默认队列:celery。有一个工作人员在处理这个队列。

supervisord.conf:

[program:celery]
autostart = true
autorestart = true
command = celery worker -A <django app>.celeryapp:app --concurrency=3 -l INFO -n celery

RabbitMQ 破坏了版本 3.3 中的 QoS 设置。您需要将 celery 至少升级到 3.1.11 (changelog) and kombu to at least 3.0.15 (changelog)。您应该使用最新版本。

我在 3.3 发布时遇到了完全相同的行为。 RabbitMQ 翻转了 prefetch_count 标志的默认行为。在此之前,如果消费者达到 eta 消息的 CELERYD_PREFETCH_MULTIPLIER 限制,工作人员将提高此限制以获取更多消息。此更改打破了此行为,因为新的默认行为拒绝了此功能。

我也有类似的症状。到达 MQ 的消息(在图表中可见)但未被工作人员接收的消息。

这使我假设我的 Django 应用程序已正确设置 Celery 应用程序,但我缺少 an import ensuring Celery would be configured during Django startup:

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app  # noqa

这是一个愚蠢的错误,但是到达代理的消息返回了 AsyncResult,这让我偏离了轨道,让我找错了地方。然后我发现设置CELERY_ALWAYS_EAGER = True没有做下蹲,事件然后任务根本没有执行。

PS:这可能不是@kev 问题的答案,但自从我来到这里几次,在寻找我的问题的解决方案时,我post 在这里为任何人在类似的情况下。