Django Celery:任务永远不会执行

Django Celery: Task never executes

在我的 Django 应用程序中,我使用的是芹菜。在 post_save 信号中,我正在更新弹性搜索中的索引。但由于某种原因,任务挂起并且从未真正执行代码:

我用运行芹菜:

celery -A collegeapp worker -l info

信号:

@receiver(post_save, sender=University)
def university_saved(sender, instance, created, **kwargs):
    """
    University save signal
    """
    print('calling celery task')
    update_university_index.delay(instance.id)
    print('finished')

任务:

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(instance_id):
    print('updating university index')

我得到的唯一输出是 calling celery task。等待超过 30 分钟后,它再也没有到达任何其他打印语句,视图继续等待。芹菜终端中什么也没有显示。

版本: 姜戈 3.0, 芹菜4.3, Redis 5.0.9, Ubuntu18

更新: 在做了一些测试后,使用 celery.py 文件中定义的 debug_task 代替 update_university_index 不会导致挂起。它的行为符合预期。我想也许它可能是 app.task vs task 装饰器,但似乎不是。

@app.task(bind=True)
def debug_task(text, second_value):
    print('printing  debug_task {} {}'.format(text, second_value))

这在我身上发生过一次,我犯了一个最愚蠢的错误,django 告诉我们在 tasks.py 文件中指定芹菜任务,并将其用于任务发现。之后它起作用了。您能否使用 tree 命令更深入地了解目录结构?

这个tutorial是给flask用的,但是在django中同样可以实现。这个特定教程的亮点在于,在您告诉 celery 执行任务后,它还为您提供了一个 uuid 并且您可以 ping 那 url 并监视您触发的任务的进度.

使用 celery 验证任务是否已注册(确保 celery 是 运行):

from celery.task.control import inspect
i = inspect()
i.registered_tasks()

或bash

$ celery inspect registered
$ celery -A collegeapp inspect registered

来自https://docs.celeryproject.org/en/latest/faq.html#the-worker-isn-t-doing-anything-just-hanging

Why is Task.delay/apply*/the worker just hanging?

Answer: There’s a bug in some AMQP clients that’ll make it hang if it’s not able to authenticate the current user, the password doesn’t match or the user doesn’t have access to the virtual host specified. Be sure to check your broker logs (for RabbitMQ that’s /var/log/rabbitmq/rabbit.log on most systems), it usually contains a message describing the reason.

更改此行

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(instance_id):
    print('updating university index')

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(self, instance_id):
    print('updating university index')

或在任务定义中添加self

我仍然不确定为什么它不起作用,但我通过将 task 替换为 app.task

找到了解决方案

从我的 celery.py 导入 app 似乎解决了这个问题。

from collegeapp.celery import app

@app.task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(self, instance_id):
    print('updating university index')