运行 定时任务存储在数据库中

Running periodic task at time stored in database

目前,我在我的 Azure 实例上使用 Job Scheduler 设置了定期任务。这些在固定时间触发 API (Django) 端点。

我想让这些时间动态化(不适用于此解决方案)。计划是直接从 Django 触发这些任务。计划时间将存储在我的数据库中 (MySQL) 并被检索以创建计划的作业。当这些值改变时,调度程序也应该相应地改变。

查看 Celery 后,似乎使用 periodic tasks crontab schedules 可行。 使用这个,是否可以根据我的数据库中的值设置我的预定时间?

看来我还需要一个 Redis 实例。 因为我只会将 Celery 用于周期性任务,它仍然是正确的方法吗?

Cron 被认为是 运行 定期执行的任务,因为它具有每天、每小时、每 15 分钟的简单配置选项...
添加 cron 作业并不是在具体日期( 或日期时间

将动态任务配置为 运行 的好方法

您可以使用 schedule module as explained in this 答案。

还有另一个库 apscheduler,但请检查最后一个版本是否适用于 python3(如果您使用它

from datetime import date
from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

# Define the function that is to be executed
def my_job(text):
    print text

# The job will be executed on November 6th, 2009
exec_date = date(2009, 11, 6)

# Store the job in a variable in case we want to cancel it
job = sched.add_date_job(my_job, exec_date, ['hello'])

我正在使用 DjangoCeleryRabbitMQpostgreSQL

我正在做你想做的事情。

画中画:celeryflower

您需要一个 Celery conf 文件(在您的 settings.py 文件夹中):

你要加的是beat_schedule :

app.conf.beat_schedule = {
    'task-name': {
        'task': 'myapp.tasks.task_name',
        'schedule': crontab(minute=30, hour=5, day_of_week='mon-fri'),
    },
}

这将在您的数据库中添加一个条目以执行 task_name(周一至周五 5:30),您可以直接更改您的设置(重新加载 celerycelery beat之后)

我喜欢的是你可以很容易地添加一个安全的重试机制:

@app.task(bind=True, max_retries=50)
def task_name(self, entry_pk):
    entry = Entry.objects.get(pk=entry_pk)
    try:
        entry.method()
    except ValueError as e:
        raise self.retry(exc=e, countdown=5 * 60, queue="punctual_queue")

当我的 method() 引发 ValueError 我将在 5 分钟内重新执行此方法,最多尝试 50 次。

好的部分是您可以访问 Django 管理中的数据库:

你可以用flower检查任务是否执行(带回溯):

我每天有1000多个任务执行,你需要的是创建队列和worker。

我为此使用了 10 个工人(以备将来扩展):

celery multi start 10 -A MYAPP -Q:1-3 recurring_queue,punctual_queue -Q:4,5 punctual_queue -Q recurring_queue --pidfile="%n.pid"

以及启动任务的守护进程:

celery -A MYAPP beat -S django --detach

这对你来说可能有点矫枉过正,但他可以为你做更多的事情: - 发送电子邮件异步(如果失败,您可以更正并重新发送电子邮件) - 为用户上传和后处理异步 - 每一项需要时间但你不想等待的任务(你可以链接一个需要完成的任务 return 一个结果并将其用于另一个任务)

在没有外部库的情况下,设置一个每日 cron 脚本,从数据库中获取今天的任务,并使用线程处理到 运行 那个时间。

def take_a_background_nap(time_to_send):
    while datetime.datetime.now() < time_to_send:
        time.sleep(60)
    print('finally running')
    return


threadObj = threading.Thread(target=take_a_background_nap, args=[datetime.datetime(2020, 5, 11, 12, 53, 0)],)
threadObj.start()

线程数量不限,但要注意并发问题。