芹菜限制队列中特定任务的数量

Celery limit number of specific task in queue

我正在使用 Celery 3。1.x 有 2 个任务。第一个任务(TaskOne)在 Celery 启动时通过 celeryd_after_setup 信号入队:

@celeryd_after_setup.connect
def celeryd_after_setup(*args, **kwargs):
    TaskOne().apply_async(countdown=5)

当 TaskOne 为 运行 时,它会进行一些计算,然后将 TaskTwo 入队。想象一下以下工作流程:

所以队列中有 2 个 TaskTwo。这对我的工作流来说是个问题,因为我只希望队列中有一个 TaskTwo 并避免第二个入队。

我的问题:我怎样才能做到这一点?

使用 celery.app.control.Inspect.scheduled() (Docs) 我可以获得一个列表,其中包含已安排的任务,隐藏在列表和字典的组合中。这也许是一种方式,但是经历这样的结果感觉不太对。有没有更好的方法?

一个易于实施的解决方案是将 --purge 开关添加到您的 worker 命令。它将清除队列,工作人员开始时没有计划的工作。

但要注意:这是一种全局的、不可恢复的操作。当您依赖其他计划作业时,这不是您的解决方案。

在考虑了几个选项后,我选择了 app.control.inspect。 这不是一个非常漂亮的解决方案,但它确实有效:

# fetch all scheduled tasks
scheduled_tasks = inspect().scheduled()

# iterate the scheduled task values, see http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#dump-of-scheduled-eta-tasks
for task_values in iter(scheduled_tasks.values()):
    # task_values is a list of dicts
    for task in task_values:
        if task['request']['name'] == '{}.{}'.format(TaskTwo.__module__, TaskTwo.__name__):
            logger.info('TaskTwo is already scheduled, skipping additional run')
                return