如何运行 Celery 在每个月底定期执行任务

How to run Celery Periodic task at every end of the month

基本上,我有一个生成发票的任务。所以我在 tasks.py 文件中创建了一个函数 generate_invoice()

现在,我正在使用 @periodic_task 来调用函数。但是我只需要在 下午 11:55 左右的最后一天完成任务 运行 - 11:57 pm.

我正在使用此函数获取月份的最后一个日期:

def get_last_date():
        today = timezone.now()
        year = today.year
        month = today.month
        last_date = calendar.monthrange(year, month)[1]
        return str(last_date)

我的任务代码如下所示:

@periodic_task(run_every=(crontab(day_of_month=get_last_date())), name="invoice_simulation", ignore_result=True)
def invoice_simulation():
    generate_invoice()

但这不起作用!

或者有什么更好的方法来实现这个功能,请指点。

按如下方式更新您的 get_last_date() 函数。

def get_last_date():
    today = datetime.date.today()
    year = today.year
    month = today.month
    last_date = calendar.monthrange(year, month)
    return str(last_date)

一个更简单的解决方案是每天 11:55 下午 运行 计划,如果今天是该月的最后一天,则检查任务内部。如果是这样,生成发票。如果不是,那么什么都不做。

类似于:

@periodic_task(run_every=(crontab(hour=23, minute=55)), name="invoice_simulation", ignore_result=True)
def invoice_simulation():
    if timezone.now().day == get_last_date():
        generate_invoice()

只要确保 timezonecrontab 不相互冲突即可。两者都必须是 TZ 感知(或 UTC),或者两者都使用朴素时间。