使用 Celery 在 django 中执行定期任务的问题

Problem with Periodic tasks in django with Celery

我的 celery 周期性任务不起作用。我希望每晚根据日期更新我的数据库。这是我在应用程序目录中的 ptasks.py 文件:

'''
import datetime
import celery
from celery.task.schedules import crontab
from celery.decorators import periodic_task
from django.utils import timezone
@periodic_task(run_every=crontab(hour=0, minute=0))
def every_night():
    tasks=Task.objects.all()
    form=TaskForm()
    if form.deadline<timezone.now() and form.staus=="ONGOING":
        form.status="PENDING"
        form.save()
'''

我在 settings.py 中使用 ampq:

'''
CELERY_BROKER_URL = 'amqp://guest:guest@localhost'
CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler'
'''

这是我的 models.py:

'''
from django.db import models
import datetime
import pytz
from django.utils import timezone
# Create your models here.
class Task(models.Model):
    title=models.CharField(max_length=30)
    complete=models.BooleanField(default=False)
    created=models.DateTimeField(default=timezone.now())
    comment=models.CharField(max_length=500, default="")
    Tag1=models.CharField(max_length=10, default="Tag1")
    deadline=models.DateTimeField(default=timezone.now())
    status=models.CharField(max_length=15,default="ONGOING")
    def __str__(self):
        return self.title
'''

这是我的 forms.py:

'''
from django import forms
from django.forms import ModelForm

from .models import *

class TaskForm(forms.ModelForm):
    class Meta:
        model=Task
        fields='__all__'

'''

解决方案 1

要按照您的计划更新数据库记录,您可以这样做:

<your_app>/models.py

from django.db import models
from django.utils import timezone


class Task(models.Model):

    STATUS_PENDING = "pending"
    STATUS_ONGOING = "ongoing"

    STATUS_CHOICES = (
        (STATUS_ONGOING, "ongoing"),
        (STATUS_PENDING, "pending"),
    )

    status = models.CharField(
        max_length=15, default=STATUS_ONGOING, choices=STATUS_CHOICES
    )
    title = models.CharField(max_length=30)
    deadline = models.DateTimeField(auto_now_add=True)
    complete = models.BooleanField(default=False)

    def __str__(self):
        return self.title

<your_app>/tasks.py

from django.utils import timezone
from celery.task.schedules import crontab
from celery.decorators import periodic_task

from .models import Task


@periodic_task(run_every=crontab(hour=0, minute=0))
def every_night():

    qs = Task.objects.filter(deadline__lt=timezone.now(), status=Task.STATUS_ONGOING)

    # Update whole queryset at once
    qs.update(status=Task.STATUS_PENDING)

    # Alternatively update one by one (e.g. if you need signals to be fired)
    # for task in qs:
    #     task.status = Task.STATUS_PENDING
    #     task.save()

确保运行芹菜"beat"触发周期性任务:

celery -A app worker -B 

解决方案 2

但是 - 对于所描述的情况,老实说,我不明白为什么要走弯路在不同的数据库字段中添加信息,而这些信息无论如何都可以很容易地从每条记录中导出。这增加了数据库本身的冗余并且似乎是不需要的。

为什么不直接使用 Manager 来轻松获得所需的模型实例?

from django.db import models
from django.utils import timezone


class TaskManager(models.Manager):
    def pending(self):
        return (
            self.get_queryset()
            .filter(deadline__lt=timezone.now())
            .exclude(complete=True)
        )


class Task(models.Model):

    title = models.CharField(max_length=30)
    deadline = models.DateTimeField(auto_now_add=True)
    complete = models.BooleanField(default=False)

    objects = TaskManager()

    def __str__(self):
        return self.title

并获取 "pending" 查询集:Task.objects.pending()