我的自定义 django-admin 命令不会调用 celery 任务
My custom django-admin command won't call celery task
我正在尝试编写一个执行 celery 任务的自定义 django-admin 命令,但是该任务没有执行,当我尝试时 django 只是挂起。
from django.core.management.base import BaseCommand
from myapp.tasks import my_celery_task
class Command(BaseCommand):
def handle(self, *args, **options):
print "starting task"
my_celery_task.delay()
print "task has been sent"
我调用命令时收到的输出是:
starting task
我从来没有到达 "task has been sent" 行。它只是挂起。我不确定为什么任务不是 运行。 Celery 任务在视图调用时被完美调用。
问题实际上是升级到 High Sierra 后 Mac 上的 RabbitMQ。
这似乎适用于 celery 5.1.2,但如果它是 shared_task
====== tasks.py
from celery import current_app
from celery.schedules import crontab
app = current_app._get_current_object()
@app.task
def my_celery_task():
# Do something interesting
return 0 # Return the result of this operation
====== my_command.py
from accounts.tasks import my_celery_task
...
def handle(self, *args, **options):
self.stdout.write("Queing request")
result = my_celery_task.delay()
self.stdout.write("Waiting for task to end")
updated = result.wait(timeout=120.0)
self.stdout.write(f"Returned {updated}")
而且,如果我将任务放在 celery.py
文件中,我最终会遇到死锁,这是一个需要调试的 PITA
我正在尝试编写一个执行 celery 任务的自定义 django-admin 命令,但是该任务没有执行,当我尝试时 django 只是挂起。
from django.core.management.base import BaseCommand
from myapp.tasks import my_celery_task
class Command(BaseCommand):
def handle(self, *args, **options):
print "starting task"
my_celery_task.delay()
print "task has been sent"
我调用命令时收到的输出是:
starting task
我从来没有到达 "task has been sent" 行。它只是挂起。我不确定为什么任务不是 运行。 Celery 任务在视图调用时被完美调用。
问题实际上是升级到 High Sierra 后 Mac 上的 RabbitMQ。
这似乎适用于 celery 5.1.2,但如果它是 shared_task
====== tasks.py
from celery import current_app
from celery.schedules import crontab
app = current_app._get_current_object()
@app.task
def my_celery_task():
# Do something interesting
return 0 # Return the result of this operation
====== my_command.py
from accounts.tasks import my_celery_task
...
def handle(self, *args, **options):
self.stdout.write("Queing request")
result = my_celery_task.delay()
self.stdout.write("Waiting for task to end")
updated = result.wait(timeout=120.0)
self.stdout.write(f"Returned {updated}")
而且,如果我将任务放在 celery.py
文件中,我最终会遇到死锁,这是一个需要调试的 PITA