芹菜限制队列中特定任务的数量
Celery limit number of specific task in queue
我正在使用 Celery 3。1.x 有 2 个任务。第一个任务(TaskOne)在 Celery 启动时通过 celeryd_after_setup 信号入队:
@celeryd_after_setup.connect
def celeryd_after_setup(*args, **kwargs):
TaskOne().apply_async(countdown=5)
当 TaskOne 为 运行 时,它会进行一些计算,然后将 TaskTwo 入队。想象一下以下工作流程:
- 我启动 celery,因此信号被触发并且 TaskOne 入队
- 倒计时后 (5) TaskTwo 入队
- 然后我停止 celery(TaskTwo 仍在队列中)
- 之后我重新启动芹菜
- 工作流再次 运行,TaskTwo 再次入队
所以队列中有 2 个 TaskTwo。这对我的工作流来说是个问题,因为我只希望队列中有一个 TaskTwo 并避免第二个入队。
我的问题:我怎样才能做到这一点?
使用 celery.app.control.Inspect.scheduled()
(Docs) 我可以获得一个列表,其中包含已安排的任务,隐藏在列表和字典的组合中。这也许是一种方式,但是经历这样的结果感觉不太对。有没有更好的方法?
一个易于实施的解决方案是将 --purge 开关添加到您的 worker 命令。它将清除队列,工作人员开始时没有计划的工作。
但要注意:这是一种全局的、不可恢复的操作。当您依赖其他计划作业时,这不是您的解决方案。
在考虑了几个选项后,我选择了 app.control.inspect。
这不是一个非常漂亮的解决方案,但它确实有效:
# fetch all scheduled tasks
scheduled_tasks = inspect().scheduled()
# iterate the scheduled task values, see http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#dump-of-scheduled-eta-tasks
for task_values in iter(scheduled_tasks.values()):
# task_values is a list of dicts
for task in task_values:
if task['request']['name'] == '{}.{}'.format(TaskTwo.__module__, TaskTwo.__name__):
logger.info('TaskTwo is already scheduled, skipping additional run')
return
我正在使用 Celery 3。1.x 有 2 个任务。第一个任务(TaskOne)在 Celery 启动时通过 celeryd_after_setup 信号入队:
@celeryd_after_setup.connect
def celeryd_after_setup(*args, **kwargs):
TaskOne().apply_async(countdown=5)
当 TaskOne 为 运行 时,它会进行一些计算,然后将 TaskTwo 入队。想象一下以下工作流程:
- 我启动 celery,因此信号被触发并且 TaskOne 入队
- 倒计时后 (5) TaskTwo 入队
- 然后我停止 celery(TaskTwo 仍在队列中)
- 之后我重新启动芹菜
- 工作流再次 运行,TaskTwo 再次入队
所以队列中有 2 个 TaskTwo。这对我的工作流来说是个问题,因为我只希望队列中有一个 TaskTwo 并避免第二个入队。
我的问题:我怎样才能做到这一点?
使用 celery.app.control.Inspect.scheduled()
(Docs) 我可以获得一个列表,其中包含已安排的任务,隐藏在列表和字典的组合中。这也许是一种方式,但是经历这样的结果感觉不太对。有没有更好的方法?
一个易于实施的解决方案是将 --purge 开关添加到您的 worker 命令。它将清除队列,工作人员开始时没有计划的工作。
但要注意:这是一种全局的、不可恢复的操作。当您依赖其他计划作业时,这不是您的解决方案。
在考虑了几个选项后,我选择了 app.control.inspect。 这不是一个非常漂亮的解决方案,但它确实有效:
# fetch all scheduled tasks
scheduled_tasks = inspect().scheduled()
# iterate the scheduled task values, see http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#dump-of-scheduled-eta-tasks
for task_values in iter(scheduled_tasks.values()):
# task_values is a list of dicts
for task in task_values:
if task['request']['name'] == '{}.{}'.format(TaskTwo.__module__, TaskTwo.__name__):
logger.info('TaskTwo is already scheduled, skipping additional run')
return