芹菜节拍时间表:运行 开始芹菜节拍时立即执行任务?
celery beat schedule: run task instantly when start celery beat?
如果我创建一个celery beat schedule,使用timedelta(days=1)
,第一个任务将在24小时后执行,引用celery beat文档:
Using a timedelta for the schedule means the task will be sent in 30 second intervals (the first task will be sent 30 seconds after celery beat starts, and then every 30 seconds after the last run).
但事实是,在很多情况下,调度程序 运行 启动任务实际上很重要,但我没有找到允许我 运行 celery 启动后立即执行任务,我没有仔细阅读,还是 celery 缺少此功能?
最好的想法是创建一个在完成任务后自行安排任务的实现。另外,创建一个入口锁,这样任务就不能在同一时刻执行多次。
触发执行一次。
在这种情况下,
- 您不需要 celerybeat 进程
- 任务保证执行
我决定我可以为每个任务声明一个实例,并在 celery 启动时执行它们。我根本不喜欢这个,因为它使开始芹菜节拍非常慢(如果你有慢 PeriodicTask
),但它做了我想要的。
只需将此添加到 tasks.py
的末尾:
########### spawn all tasks at launch! ############
localmess = locals().values()
for obj in localmess:
if isclass(obj):
if obj is not PeriodicTask and issubclass(obj, PeriodicTask):
instance = obj()
logger.info('running {0}'.format(obj))
try:
instance.run()
except:
logger.warn('task fail: {0}'.format(obj))
pass
######## all tasks must be decleared above! ########
当工作人员使用 worker_ready.connect
装饰器准备就绪时,您可以 运行 任务:
from celery.signals import worker_ready
@worker_ready.connect
def at_start(sender, **kwargs):
"""Run tasks at startup"""
with sender.app.connection() as conn:
sender.app.send_task("app.module.task", connection=conn)
致谢此答案:
如果我创建一个celery beat schedule,使用timedelta(days=1)
,第一个任务将在24小时后执行,引用celery beat文档:
Using a timedelta for the schedule means the task will be sent in 30 second intervals (the first task will be sent 30 seconds after celery beat starts, and then every 30 seconds after the last run).
但事实是,在很多情况下,调度程序 运行 启动任务实际上很重要,但我没有找到允许我 运行 celery 启动后立即执行任务,我没有仔细阅读,还是 celery 缺少此功能?
最好的想法是创建一个在完成任务后自行安排任务的实现。另外,创建一个入口锁,这样任务就不能在同一时刻执行多次。 触发执行一次。
在这种情况下,
- 您不需要 celerybeat 进程
- 任务保证执行
我决定我可以为每个任务声明一个实例,并在 celery 启动时执行它们。我根本不喜欢这个,因为它使开始芹菜节拍非常慢(如果你有慢 PeriodicTask
),但它做了我想要的。
只需将此添加到 tasks.py
的末尾:
########### spawn all tasks at launch! ############
localmess = locals().values()
for obj in localmess:
if isclass(obj):
if obj is not PeriodicTask and issubclass(obj, PeriodicTask):
instance = obj()
logger.info('running {0}'.format(obj))
try:
instance.run()
except:
logger.warn('task fail: {0}'.format(obj))
pass
######## all tasks must be decleared above! ########
当工作人员使用 worker_ready.connect
装饰器准备就绪时,您可以 运行 任务:
from celery.signals import worker_ready
@worker_ready.connect
def at_start(sender, **kwargs):
"""Run tasks at startup"""
with sender.app.connection() as conn:
sender.app.send_task("app.module.task", connection=conn)
致谢此答案: