如果尚未使用芹菜安排任务,则允许执行任务

Allow a task execution if it's not already scheduled using celery

我正在使用 Celery 来处理我正在开发的 Django 应用程序中的任务调度,我正在使用 Django 数据库进行测试。

我只是尝试了几种方法来处理任务的执行,前提是它尚未按此 article 中的建议进行安排或正在进行,但到目前为止没有任何效果。

像这样:

task.py

@task()
def add(x, y):
   return x + y

然后当你像下面这样调用它两次时:

import myapp.tasks.add

myapp.tasks.add.apply_async((2,2), task_id=1, countdown=15)
myapp.tasks.add.apply_async((2,2), task_id=2, countdown=15)

它应该允许基于 countdown=15 的一个实例。如果还有另一个 运行 或正在等待,我如何才能实现第二个调用永远不会执行它?

三思而后行!您可以在排队任务之前检查是否有任何任务running/waiting。

from celery.task.control import inspect

def is_running_waiting(task_name):
    """
    Check if a task is running or waiting.
    """
    scheduled_tasks = inspect().scheduled().values()[0]
    for task in scheduled_tasks:
        if task['request']['name'] == task_name:
            return True
    running_tasks = inspect().active().values()[0]
    for task in running_tasks:
        if task['request']['name'] == task_name:
            return True

现在如果你排队三个添加任务,第一个将排队等待执行,其余的不会排队。

for i in range(3):
    if not is_running_waiting('add'):
        add.apply_async((2,2), countdown=15)

已接受答案的一个问题是速度很慢。检查任务是否已经 运行ning 涉及调用代理,然后遍历 运行ning 和活动任务。如果你想快速排队任务,这是行不通的。此外,当前的解决方案有一个小的竞争条件,因为 2 个进程可能正在检查任务是否已同时排队(发现它不是),然后这将排队 2 个任务。

更好的解决方案是我所说的去抖动任务。基本上每次排队任务时都会增加一个计数器。当任务开始时,你递减它。使用redis然后都是原子的。

例如

排队任务:

conn = get_redis()
conn.incr(key)
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)

然后在任务中,你有2个选项,你是想在第一个排队后15秒执行任务(throttle)还是在最后一个排队后15秒执行任务(debounce)。也就是说,如果我们继续尝试 运行 相同的任务,我们会延长计时器,还是只等待第一个任务 15 并忽略排队的其他任务。

两者都很容易支持,这是我们等待任务停止排队的去抖:

conn = get_redis()
counter = conn.decr(key)
if counter > 0:
    # task is queued
    return
# continue on to rest of task

油门版本:

counter = conn.getset(key, '0')
if counter == '0':
    # we already ran so ignore all the tasks that were queued since
    return
# continue on to task

与公认的解决方案相比,此解决方案的另一个好处是密钥完全在您的控制之下。因此,如果您希望执行相同的任务但只针对不同的任务执行一次 id/objects,例如,您可以将其合并到您的密钥中。

更新

想多了,你可以更轻松地完成节流版本,而无需排队任务。

Throttle v2(排队任务时)

conn = get_redis()
counter = conn.incr(key)
if counter == 1:
    # queue up the task only the first time
    task.apply_async(args=args, kwargs=kwargs, countdown=countdown)

然后在任务中将计数器设置回 0。

你甚至不必使用计数器,如果你有一个集合,你可以将密钥添加到集合中。如果返回 1,则密钥不在集合中,您应该将任务排队。如果返回 0,则密钥已在集合中,因此不要将任务排队。