Celery/RabbitMQ 未确认消息阻塞队列?
Celery/RabbitMQ unacked messages blocking queue?
我已经调用了一个使用 urllib2 远程获取一些信息的任务几千次。这些任务是用随机 eta 安排的(一周内),因此它们不会同时访问服务器。有时我会收到 404,有时不会。我正在处理错误以防万一。
在 RabbitMQ 控制台中,我可以看到 16 条未确认的消息:
我停止了芹菜,清除了队列并重新启动了它。 16条未确认的消息还在。
我还有其他任务进入同一个队列,其中 none 也已执行。清除后,我尝试提交另一个任务,它的状态仍然是 ready:
有什么办法可以找出消息未被确认的原因吗?
版本:
celery==3.1.4
{rabbit,"RabbitMQ","3.5.3"}
celeryapp.py
CELERYBEAT_SCHEDULE = {
'social_grabber': {
'task': '<django app>.tasks.task_social_grabber',
'schedule': crontab(hour=5, minute=0, day_of_week='sunday'),
},
}
tasks.py
@app.task
def task_social_grabber():
for user in users:
eta = randint(0, 60 * 60 * 24 * 7) #week in seconds
task_social_grabber_single.apply_async((user), countdown=eta)
没有为此任务定义路由,因此它进入默认队列:celery。有一个工作人员在处理这个队列。
supervisord.conf:
[program:celery]
autostart = true
autorestart = true
command = celery worker -A <django app>.celeryapp:app --concurrency=3 -l INFO -n celery
RabbitMQ 破坏了版本 3.3 中的 QoS 设置。您需要将 celery 至少升级到 3.1.11 (changelog) and kombu to at least 3.0.15 (changelog)。您应该使用最新版本。
我在 3.3 发布时遇到了完全相同的行为。 RabbitMQ 翻转了 prefetch_count 标志的默认行为。在此之前,如果消费者达到 eta 消息的 CELERYD_PREFETCH_MULTIPLIER 限制,工作人员将提高此限制以获取更多消息。此更改打破了此行为,因为新的默认行为拒绝了此功能。
我也有类似的症状。到达 MQ 的消息(在图表中可见)但未被工作人员接收的消息。
这使我假设我的 Django 应用程序已正确设置 Celery 应用程序,但我缺少 an import ensuring Celery would be configured during Django startup:
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app # noqa
这是一个愚蠢的错误,但是到达代理的消息返回了 AsyncResult,这让我偏离了轨道,让我找错了地方。然后我发现设置CELERY_ALWAYS_EAGER = True
没有做下蹲,事件然后任务根本没有执行。
PS:这可能不是@kev 问题的答案,但自从我来到这里几次,在寻找我的问题的解决方案时,我post 在这里为任何人在类似的情况下。
我已经调用了一个使用 urllib2 远程获取一些信息的任务几千次。这些任务是用随机 eta 安排的(一周内),因此它们不会同时访问服务器。有时我会收到 404,有时不会。我正在处理错误以防万一。
在 RabbitMQ 控制台中,我可以看到 16 条未确认的消息:
我停止了芹菜,清除了队列并重新启动了它。 16条未确认的消息还在。
我还有其他任务进入同一个队列,其中 none 也已执行。清除后,我尝试提交另一个任务,它的状态仍然是 ready:
有什么办法可以找出消息未被确认的原因吗?
版本:
celery==3.1.4
{rabbit,"RabbitMQ","3.5.3"}
celeryapp.py
CELERYBEAT_SCHEDULE = {
'social_grabber': {
'task': '<django app>.tasks.task_social_grabber',
'schedule': crontab(hour=5, minute=0, day_of_week='sunday'),
},
}
tasks.py
@app.task
def task_social_grabber():
for user in users:
eta = randint(0, 60 * 60 * 24 * 7) #week in seconds
task_social_grabber_single.apply_async((user), countdown=eta)
没有为此任务定义路由,因此它进入默认队列:celery。有一个工作人员在处理这个队列。
supervisord.conf:
[program:celery]
autostart = true
autorestart = true
command = celery worker -A <django app>.celeryapp:app --concurrency=3 -l INFO -n celery
RabbitMQ 破坏了版本 3.3 中的 QoS 设置。您需要将 celery 至少升级到 3.1.11 (changelog) and kombu to at least 3.0.15 (changelog)。您应该使用最新版本。
我在 3.3 发布时遇到了完全相同的行为。 RabbitMQ 翻转了 prefetch_count 标志的默认行为。在此之前,如果消费者达到 eta 消息的 CELERYD_PREFETCH_MULTIPLIER 限制,工作人员将提高此限制以获取更多消息。此更改打破了此行为,因为新的默认行为拒绝了此功能。
我也有类似的症状。到达 MQ 的消息(在图表中可见)但未被工作人员接收的消息。
这使我假设我的 Django 应用程序已正确设置 Celery 应用程序,但我缺少 an import ensuring Celery would be configured during Django startup:
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app # noqa
这是一个愚蠢的错误,但是到达代理的消息返回了 AsyncResult,这让我偏离了轨道,让我找错了地方。然后我发现设置CELERY_ALWAYS_EAGER = True
没有做下蹲,事件然后任务根本没有执行。
PS:这可能不是@kev 问题的答案,但自从我来到这里几次,在寻找我的问题的解决方案时,我post 在这里为任何人在类似的情况下。