如果尚未使用芹菜安排任务,则允许执行任务
Allow a task execution if it's not already scheduled using celery
我正在使用 Celery 来处理我正在开发的 Django 应用程序中的任务调度,我正在使用 Django 数据库进行测试。
我只是尝试了几种方法来处理任务的执行,前提是它尚未按此 article 中的建议进行安排或正在进行,但到目前为止没有任何效果。
像这样:
task.py
@task()
def add(x, y):
return x + y
然后当你像下面这样调用它两次时:
import myapp.tasks.add
myapp.tasks.add.apply_async((2,2), task_id=1, countdown=15)
myapp.tasks.add.apply_async((2,2), task_id=2, countdown=15)
它应该允许基于 countdown=15
的一个实例。如果还有另一个 运行 或正在等待,我如何才能实现第二个调用永远不会执行它?
三思而后行!您可以在排队任务之前检查是否有任何任务running/waiting。
from celery.task.control import inspect
def is_running_waiting(task_name):
"""
Check if a task is running or waiting.
"""
scheduled_tasks = inspect().scheduled().values()[0]
for task in scheduled_tasks:
if task['request']['name'] == task_name:
return True
running_tasks = inspect().active().values()[0]
for task in running_tasks:
if task['request']['name'] == task_name:
return True
现在如果你排队三个添加任务,第一个将排队等待执行,其余的不会排队。
for i in range(3):
if not is_running_waiting('add'):
add.apply_async((2,2), countdown=15)
已接受答案的一个问题是速度很慢。检查任务是否已经 运行ning 涉及调用代理,然后遍历 运行ning 和活动任务。如果你想快速排队任务,这是行不通的。此外,当前的解决方案有一个小的竞争条件,因为 2 个进程可能正在检查任务是否已同时排队(发现它不是),然后这将排队 2 个任务。
更好的解决方案是我所说的去抖动任务。基本上每次排队任务时都会增加一个计数器。当任务开始时,你递减它。使用redis然后都是原子的。
例如
排队任务:
conn = get_redis()
conn.incr(key)
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
然后在任务中,你有2个选项,你是想在第一个排队后15秒执行任务(throttle)还是在最后一个排队后15秒执行任务(debounce)。也就是说,如果我们继续尝试 运行 相同的任务,我们会延长计时器,还是只等待第一个任务 15 并忽略排队的其他任务。
两者都很容易支持,这是我们等待任务停止排队的去抖:
conn = get_redis()
counter = conn.decr(key)
if counter > 0:
# task is queued
return
# continue on to rest of task
油门版本:
counter = conn.getset(key, '0')
if counter == '0':
# we already ran so ignore all the tasks that were queued since
return
# continue on to task
与公认的解决方案相比,此解决方案的另一个好处是密钥完全在您的控制之下。因此,如果您希望执行相同的任务但只针对不同的任务执行一次 id/objects,例如,您可以将其合并到您的密钥中。
更新
想多了,你可以更轻松地完成节流版本,而无需排队任务。
Throttle v2(排队任务时)
conn = get_redis()
counter = conn.incr(key)
if counter == 1:
# queue up the task only the first time
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
然后在任务中将计数器设置回 0。
你甚至不必使用计数器,如果你有一个集合,你可以将密钥添加到集合中。如果返回 1,则密钥不在集合中,您应该将任务排队。如果返回 0,则密钥已在集合中,因此不要将任务排队。
我正在使用 Celery 来处理我正在开发的 Django 应用程序中的任务调度,我正在使用 Django 数据库进行测试。
我只是尝试了几种方法来处理任务的执行,前提是它尚未按此 article 中的建议进行安排或正在进行,但到目前为止没有任何效果。
像这样:
task.py
@task()
def add(x, y):
return x + y
然后当你像下面这样调用它两次时:
import myapp.tasks.add
myapp.tasks.add.apply_async((2,2), task_id=1, countdown=15)
myapp.tasks.add.apply_async((2,2), task_id=2, countdown=15)
它应该允许基于 countdown=15
的一个实例。如果还有另一个 运行 或正在等待,我如何才能实现第二个调用永远不会执行它?
三思而后行!您可以在排队任务之前检查是否有任何任务running/waiting。
from celery.task.control import inspect
def is_running_waiting(task_name):
"""
Check if a task is running or waiting.
"""
scheduled_tasks = inspect().scheduled().values()[0]
for task in scheduled_tasks:
if task['request']['name'] == task_name:
return True
running_tasks = inspect().active().values()[0]
for task in running_tasks:
if task['request']['name'] == task_name:
return True
现在如果你排队三个添加任务,第一个将排队等待执行,其余的不会排队。
for i in range(3):
if not is_running_waiting('add'):
add.apply_async((2,2), countdown=15)
已接受答案的一个问题是速度很慢。检查任务是否已经 运行ning 涉及调用代理,然后遍历 运行ning 和活动任务。如果你想快速排队任务,这是行不通的。此外,当前的解决方案有一个小的竞争条件,因为 2 个进程可能正在检查任务是否已同时排队(发现它不是),然后这将排队 2 个任务。
更好的解决方案是我所说的去抖动任务。基本上每次排队任务时都会增加一个计数器。当任务开始时,你递减它。使用redis然后都是原子的。
例如
排队任务:
conn = get_redis()
conn.incr(key)
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
然后在任务中,你有2个选项,你是想在第一个排队后15秒执行任务(throttle)还是在最后一个排队后15秒执行任务(debounce)。也就是说,如果我们继续尝试 运行 相同的任务,我们会延长计时器,还是只等待第一个任务 15 并忽略排队的其他任务。
两者都很容易支持,这是我们等待任务停止排队的去抖:
conn = get_redis()
counter = conn.decr(key)
if counter > 0:
# task is queued
return
# continue on to rest of task
油门版本:
counter = conn.getset(key, '0')
if counter == '0':
# we already ran so ignore all the tasks that were queued since
return
# continue on to task
与公认的解决方案相比,此解决方案的另一个好处是密钥完全在您的控制之下。因此,如果您希望执行相同的任务但只针对不同的任务执行一次 id/objects,例如,您可以将其合并到您的密钥中。
更新
想多了,你可以更轻松地完成节流版本,而无需排队任务。
Throttle v2(排队任务时)
conn = get_redis()
counter = conn.incr(key)
if counter == 1:
# queue up the task only the first time
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
然后在任务中将计数器设置回 0。
你甚至不必使用计数器,如果你有一个集合,你可以将密钥添加到集合中。如果返回 1,则密钥不在集合中,您应该将任务排队。如果返回 0,则密钥已在集合中,因此不要将任务排队。