我的自定义 django-admin 命令不会调用 celery 任务

My custom django-admin command won't call celery task

我正在尝试编写一个执行 celery 任务的自定义 django-admin 命令,但是该任务没有执行,当我尝试时 django 只是挂起。

from django.core.management.base import BaseCommand
from myapp.tasks import my_celery_task

class Command(BaseCommand):
    def handle(self, *args, **options):
        print "starting task"
        my_celery_task.delay()
        print "task has been sent"

我调用命令时收到的输出是:

starting task

我从来没有到达 "task has been sent" 行。它只是挂起。我不确定为什么任务不是 运行。 Celery 任务在视图调用时被完美调用。

问题实际上是升级到 High Sierra 后 Mac 上的 RabbitMQ。

这似乎适用于 celery 5.1.2,但如果它是 shared_task

====== tasks.py

from celery import current_app
from celery.schedules import crontab
app = current_app._get_current_object()

@app.task
def my_celery_task():
    # Do something interesting
    return 0 # Return the result of this operation

====== my_command.py

from accounts.tasks import my_celery_task
...
    def handle(self, *args, **options):
        self.stdout.write("Queing request")
        result = my_celery_task.delay()
        self.stdout.write("Waiting for task to end")
        updated = result.wait(timeout=120.0)

        self.stdout.write(f"Returned {updated}")

而且,如果我将任务放在 celery.py 文件中,我最终会遇到死锁,这是一个需要调试的 PITA