如何在 Django 中测试芹菜 periodic_task?
How to test celery periodic_task in Django?
我有一个简单的周期性任务:
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from .models import Subscription
@periodic_task(run_every=crontab(minute=0, hour=0))
def deactivate_subscriptions():
for subscription in Subscription.objects.filter(is_expired=True):
print(subscription)
subscription.is_active = False
subscription.can_activate = False
subscription.save()
我想用测试来覆盖它。
我找到了有关如何测试简单任务的信息,例如@shared_task,但我找不到测试示例 @periodic_task
使用函数apply()
。 Documentation for apply()
.
当使用装饰器定义周期性任务时,您可以通过以下方式访问 crontab 配置:
tasks.py
@periodic_task(
run_every=(crontab(minute="*/1")),
)
def my_task():
pass
一些你需要访问的文件
from .tasks import my_task
crontab = my_task.run_every
hours_when_it_will_run = crontab.hour
minutes_when_it_will_run = crontab.minute
day_of_the_week_when_it_will_run = crontab.day_of_week
day_of_the_month_when_it_will_run = crontab.day_of_month
month_of_year_when_it_will_run = crontab.month_of_year
通过这种方式,您可以访问何时执行任务,并检查您的测试是否存在预期时间。
我有一个简单的周期性任务:
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from .models import Subscription
@periodic_task(run_every=crontab(minute=0, hour=0))
def deactivate_subscriptions():
for subscription in Subscription.objects.filter(is_expired=True):
print(subscription)
subscription.is_active = False
subscription.can_activate = False
subscription.save()
我想用测试来覆盖它。
我找到了有关如何测试简单任务的信息,例如@shared_task,但我找不到测试示例 @periodic_task
使用函数apply()
。 Documentation for apply()
.
当使用装饰器定义周期性任务时,您可以通过以下方式访问 crontab 配置:
tasks.py
@periodic_task(
run_every=(crontab(minute="*/1")),
)
def my_task():
pass
一些你需要访问的文件
from .tasks import my_task
crontab = my_task.run_every
hours_when_it_will_run = crontab.hour
minutes_when_it_will_run = crontab.minute
day_of_the_week_when_it_will_run = crontab.day_of_week
day_of_the_month_when_it_will_run = crontab.day_of_month
month_of_year_when_it_will_run = crontab.month_of_year
通过这种方式,您可以访问何时执行任务,并检查您的测试是否存在预期时间。