我如何在 Django 模型中使用 celery beat 为每个对象创建单独的任务
How do i create seperate tasks for every objects with celery beat in Django models
假设我们有以下模型字段:
class Project(models.Model):
project_name = models.CharField(max_length=200,unique=True)
project_scan = models.IntegerField() ### Scan interval
project_status = models.BooleanField() ### To Enable "Scan" or Disable "Scan" Tasks
假设我们有 2 个项目对象:
1. Project(project_name='test1',project_scan=5) ### Scan `test1` every `5` hour
2. Project(project_name='test2',project_scan=10) ### Scan `test2` every `10` hour
Tasks.py
@task(name='project_tasks')
def Project_Tasks():
get_all_projects = Project.objects.all()
for each_project in get_all_project:
if each_project.project_status == True: ### Checking if it "Scan" is allowed.
get_interval = each_project.project_scan
get_name = each_project.project_name
print(get_name)
我的问题:
我如何根据给定的 project_scan
间隔对每个对象执行 运行 任务? ,由于 Celery beat 将任务名称作为参数来执行扫描,例如:
PeriodicTask.objects.create(interval=given_interval, name='I dont know', task='project_tasks', )
如何为每个项目任务创建单独的实例?
我尝试在 models.py 中创建 intervalSchedule 字段但没有成功:
class Project(models.Model):
project_name = models.CharField(max_length=200,unique=True)
project_scan = models.IntegerField() ### Scan interval
project_status = models.BooleanField() ### To Enable "Scan" or Disable "Scan" Tasks
schedule = IntervalSchedule()
您可以添加使用信号:
from django.db.models.signals import post_save
from django.dispatch import receiver
from django_celery_beat.models import PeriodicTask, IntervalSchedule
class Project(models.Model):
project_name = models.CharField(max_length=200,unique=True)
project_scan = models.IntegerField()
project_status = models.BooleanField()
def set_periodic_task(self, task_name):
schedule = self.get_or_create_interval()
PeriodicTask.objects.create(
interval=schedule,
name=f'{self.project_name}-{self.id}',
task=task_name,
)
def get_or_create_interval(self):
schedule, created = IntervalSchedule.objects.get_or_create(
every=self.project_scan,
period=IntervalSchedule.HOURS,
)
return schedule
def get_periodic_task(self, task_name):
interval = self.get_or_create_interval()
periodic_task = PeriodicTask.objects.get(
interval=interval,
name=f'{self.project_name}-{self.id}',
task=task_name,
)
return periodic_task
def sync_disable_enable_task(self, task_name):
periodic_task = self.get_periodic_task(task_name)
periodic_task.enabled = self.project_status
periodic_task.save()
@receiver(post_save, sender=Project)
def set_or_sync_periodic_task(sender, instance=None, created=False, **kwargs):
if created:
instance.set_periodic_task(task_name='project_tasks')
else:
instance.sync_disable_enable_task(task_name='project_tasks')
你有什么:
当您创建新的 Project
实例时,将使用方法 set_periodic_task
保存新的定期任务。如果您想 disable
或 enable
实例的定期任务,您只需更改 project_status
状态并保存即可。它将触发 sync_disable_enable_task
启用或禁用的方法。
如果你想传递参数,你可以这样做:
PeriodicTask.objects.create(
interval=schedule,
name=f'{self.project_name}-{self.id}',
task='proj.tasks.import_contacts',
args=json.dumps(['arg1', 'arg2']),
kwargs=json.dumps({
'some_kwarg': '123,
}),
)
假设我们有以下模型字段:
class Project(models.Model):
project_name = models.CharField(max_length=200,unique=True)
project_scan = models.IntegerField() ### Scan interval
project_status = models.BooleanField() ### To Enable "Scan" or Disable "Scan" Tasks
假设我们有 2 个项目对象:
1. Project(project_name='test1',project_scan=5) ### Scan `test1` every `5` hour
2. Project(project_name='test2',project_scan=10) ### Scan `test2` every `10` hour
Tasks.py
@task(name='project_tasks')
def Project_Tasks():
get_all_projects = Project.objects.all()
for each_project in get_all_project:
if each_project.project_status == True: ### Checking if it "Scan" is allowed.
get_interval = each_project.project_scan
get_name = each_project.project_name
print(get_name)
我的问题:
我如何根据给定的 project_scan
间隔对每个对象执行 运行 任务? ,由于 Celery beat 将任务名称作为参数来执行扫描,例如:
PeriodicTask.objects.create(interval=given_interval, name='I dont know', task='project_tasks', )
如何为每个项目任务创建单独的实例?
我尝试在 models.py 中创建 intervalSchedule 字段但没有成功:
class Project(models.Model):
project_name = models.CharField(max_length=200,unique=True)
project_scan = models.IntegerField() ### Scan interval
project_status = models.BooleanField() ### To Enable "Scan" or Disable "Scan" Tasks
schedule = IntervalSchedule()
您可以添加使用信号:
from django.db.models.signals import post_save
from django.dispatch import receiver
from django_celery_beat.models import PeriodicTask, IntervalSchedule
class Project(models.Model):
project_name = models.CharField(max_length=200,unique=True)
project_scan = models.IntegerField()
project_status = models.BooleanField()
def set_periodic_task(self, task_name):
schedule = self.get_or_create_interval()
PeriodicTask.objects.create(
interval=schedule,
name=f'{self.project_name}-{self.id}',
task=task_name,
)
def get_or_create_interval(self):
schedule, created = IntervalSchedule.objects.get_or_create(
every=self.project_scan,
period=IntervalSchedule.HOURS,
)
return schedule
def get_periodic_task(self, task_name):
interval = self.get_or_create_interval()
periodic_task = PeriodicTask.objects.get(
interval=interval,
name=f'{self.project_name}-{self.id}',
task=task_name,
)
return periodic_task
def sync_disable_enable_task(self, task_name):
periodic_task = self.get_periodic_task(task_name)
periodic_task.enabled = self.project_status
periodic_task.save()
@receiver(post_save, sender=Project)
def set_or_sync_periodic_task(sender, instance=None, created=False, **kwargs):
if created:
instance.set_periodic_task(task_name='project_tasks')
else:
instance.sync_disable_enable_task(task_name='project_tasks')
你有什么:
当您创建新的 Project
实例时,将使用方法 set_periodic_task
保存新的定期任务。如果您想 disable
或 enable
实例的定期任务,您只需更改 project_status
状态并保存即可。它将触发 sync_disable_enable_task
启用或禁用的方法。
如果你想传递参数,你可以这样做:
PeriodicTask.objects.create(
interval=schedule,
name=f'{self.project_name}-{self.id}',
task='proj.tasks.import_contacts',
args=json.dumps(['arg1', 'arg2']),
kwargs=json.dumps({
'some_kwarg': '123,
}),
)