Django Celery 定期手动调用

Django Celery Invoking Periodically and Manually

我正在使用 Django 和 Celery 来安排任务。

tasks.py

@periodic_task(run_every=crontab(hour='00', minute='00', day_of_week="*"))
def hello_world():
    print('hello world!)

除了每天 运行 启用该功能外,我还需要能够手动调用它。如果用户按下前端的按钮,它将 运行。我能做到:

@periodic_task(run_every=crontab(hour='00', minute='00', day_of_week="*"))
def hello_world():
    print('hello world!)

@task()
def hello_world():
    print('hello world!)

但这与 DRY 背道而驰。此外,我可能有不止一种功能会出现相同的情况。我需要定期 运行 它,但也应要求。

我试过这个:

def hello_world():
    print('hello world!)

@periodic_task(run_every=crontab(hour='00', minute='00', day_of_week="*"))
hello_world()

@task
hello_world()

但这不起作用。我得到

invalid syntax

我无法在文档或其他 Whosebug 答案中找到这种情况。他们谈论从 shell 调用函数。帮助将不胜感激。

谢谢!

你不需要定义任务两次,你的周期性任务也可以用hello_world.delay()手动调用,所以你会:

@periodic_task(run_every=crontab(hour='00', minute='00', day_of_week='*'))
def hello_world():
    print('hello world!')

def on_button_press():
    hello_world.delay()