Celery 动态添加新任务

Celery Add new tasks dinamically

是否可以从另一个任务安排 celery 中的任务?

我有这个 Python 脚本:

import logging
from celery import Celery
from datetime import datetime

logger = logging.getLogger(__name__)

app = Celery('app', backend='amqp://', broker='pyamqp://guest@localhost:5672//')

@app.task()
def add(x, y):
    result = x + y
    logger.info(f'Add: {x} + {y} = {result}')
    return result

@app.task()
def setPeriodicTask():
     #option 1 
    app.add_periodic_task(10, add.s(30, 1))
     #option 2 
    app.conf.beat_schedule = {
        'add-every-5-seconds': {
            'task': 'app.add',
            'schedule': 5.0,
            'args': (now.hour, now.second)
        }
    }
    logger.info(f'setPeriodicTask succeeded')
    return 1

当我调用添加任务时,它工作正常。

如果我调用 setPeriodicTask 任务,它不会抛出任何错误,但不会安排添加任务。 我已经尝试了两种选择,none 有效:

  1. add_periodic_task
  2. 修改beat_schedule

如果我将这段代码添加到我的 Python 脚本中(正如我在 celery 文档中看到的那样):

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(5.0, add.s(10, 1))

我可以看到按预期安排的添加任务 运行。所以 celery 和 celery beat 似乎工作正常。 但是我想enable/disable点播任务

可能吗?如果是这样,我做错了什么?

以防其他人遇到此问题。

我最终使用了一个与 django -celery-beat 文档中提到的方法类似的数据库:django-celery-beat - Database-backed Periodic Tasks