如何每天早上 6 点和下午 6 点 运行 Django 芹菜任务?
How to run a Django celery task every 6am and 6pm daily?
您好,我的项目中有 Django Celery。目前每天 运行 每天 12 小时一班(Midnight/00:00am 和 12:00pm)。但我希望它每天早上 6 点和下午 6 点 运行。我怎样才能做到这一点?提前致谢。
任务:
from celery.task import periodic_task
from celery.schedules import crontab
from xxx.views import update_xx_task, execute_yy_task
@periodic_task(run_every=crontab(minute=0, hour='*/12'),
queue='nonsdepdb3115', options={'queue': 'nonsdepdb3115'})
def xxx_execute_xx_task():
execute_yy_task()
update_xx_task()
来自 documentation,在示例中 table - 您可以看到您可以在多个小时内(24 小时制)通过。所以,正如你想在早上 6 点和下午 6 点 (1800) 运行 它:
@periodic_task(run_every=crontab(minute=0, hour='6,18'))
最好顺便做一下:
In your celery.py
file
import os
from celery import Celery
from celery.schedules import crontab
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Moex.settings')
app = Celery('Moex',
backend='rpc://',
broker='pyamqp://', )
app.config_from_object('django.conf:settings', namespace='CELERY', )
app.conf.update(result_expires=3600,
enable_utc=True,
timezone='Europe/Moscow', )
app.conf.beat_schedule = {
"every day between 6 AM & 18 PM": {
"task": "xxx_execute_xx_task", # <---- Name of task
"schedule": crontab(hour='6, 18',
minute=0,
)
},
"every minute": {
"task": "check_if_need_update_prices",
'schedule': 60.0,
}
}
app.autodiscover_tasks()
Then in your tasks.py file
import requests
from celery import shared_task, states
@shared_task(bind=True,
name='xxx_execute_xx_task',
max_retries=3,
soft_time_limit=20)
def xxx_execute_xx_task(self):
# do something
data = requests.get(url='
'how-to-run-a-django-celery-task-every-6am-and-6pm-daily')
if data.status_code == 200:
task.update_state(state=states.SUCCESS)
if data:
self.update_state(state=states.SUCCESS)
else:
self.update_state(state=states.FAILURE)
您好,我的项目中有 Django Celery。目前每天 运行 每天 12 小时一班(Midnight/00:00am 和 12:00pm)。但我希望它每天早上 6 点和下午 6 点 运行。我怎样才能做到这一点?提前致谢。
任务:
from celery.task import periodic_task
from celery.schedules import crontab
from xxx.views import update_xx_task, execute_yy_task
@periodic_task(run_every=crontab(minute=0, hour='*/12'),
queue='nonsdepdb3115', options={'queue': 'nonsdepdb3115'})
def xxx_execute_xx_task():
execute_yy_task()
update_xx_task()
来自 documentation,在示例中 table - 您可以看到您可以在多个小时内(24 小时制)通过。所以,正如你想在早上 6 点和下午 6 点 (1800) 运行 它:
@periodic_task(run_every=crontab(minute=0, hour='6,18'))
最好顺便做一下:
In your
celery.py
file
import os
from celery import Celery
from celery.schedules import crontab
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Moex.settings')
app = Celery('Moex',
backend='rpc://',
broker='pyamqp://', )
app.config_from_object('django.conf:settings', namespace='CELERY', )
app.conf.update(result_expires=3600,
enable_utc=True,
timezone='Europe/Moscow', )
app.conf.beat_schedule = {
"every day between 6 AM & 18 PM": {
"task": "xxx_execute_xx_task", # <---- Name of task
"schedule": crontab(hour='6, 18',
minute=0,
)
},
"every minute": {
"task": "check_if_need_update_prices",
'schedule': 60.0,
}
}
app.autodiscover_tasks()
Then in your tasks.py file
import requests
from celery import shared_task, states
@shared_task(bind=True,
name='xxx_execute_xx_task',
max_retries=3,
soft_time_limit=20)
def xxx_execute_xx_task(self):
# do something
data = requests.get(url='
'how-to-run-a-django-celery-task-every-6am-and-6pm-daily')
if data.status_code == 200:
task.update_state(state=states.SUCCESS)
if data:
self.update_state(state=states.SUCCESS)
else:
self.update_state(state=states.FAILURE)