Celery 使用 app.control.purge() 时 运行 任务会怎样?
Celery what happen to running tasks when using app.control.purge()?
目前我有一批芹菜 运行 django 像这样:
Celery.py:
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
app.control.purge()
sender.add_periodic_task(30.0, check_loop.s())
recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
print("setup_periodic_tasks")
@app.task()
def check_loop():
.....
start = database start number
end = database end number
callling apis in a list from id=start to id=end
create objects
update database(start number = end, end number = end + 3)
....
@app.task()
def recursion_function(default_retry_delay=10):
.....
do some looping
....
#when finished, call itself again
recursion_function.apply_async(countdown=30)
我的目标是每当 celery 文件被编辑时它会重新启动所有任务 - 删除尚未执行的排队任务(我这样做是因为 recursion_function
将再次 运行 本身如果它完成了检查数据库中 table 的每条记录的工作,所以我不用担心它会中途停止)。
check_loop
函数将调用一个 api,该 api 已分页到 return 个对象列表,我会将其与 table 中的记录进行比较,如果匹配则创建另一个模型的新自定义记录
我的问题是,当我清除所有消息时,当前的 运行ning 任务会在中途停止还是会继续 运行ning?因为如果 check_loop
函数在 api 列表中途停止循环,那么它将再次 运行 循环,我将创建我不想要的新的重复记录
示例:
在 check_loop()
的运行任务期间,它在中途创建了对象(在 api 列表中,从元素 id=2 到 id=5),服务器重新启动 -> 运行,现在 check_loop()
运行 从开始(在 api 列表中从元素 id=2 到 id=5)并再次从该列表创建对象(我 100% 不想要)
是这样吗运行?我只需要确认一下
编辑:
https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks
我添加了 app.control.purge()
,因为当我重新启动时,recursion_function
在 setup_periodic_tasks
中再次被调用,而前一个 recursion_function.apply_async(countdown=30)
中的 recursion_function
也会执行,所以它会自己倍增
是,工人将continue execution of currently running task除非工人也重新启动。
此外,Celery Way 是 总是期望 任务是 运行 在并发环境中考虑以下因素:
- 有很多任务运行并发
- 有很多celery worker在执行任务
- 相同的任务可能会再次 运行
- 同一任务的多个实例可能 运行 同时
- 任何任务都可以随时终止
即使您确定在您的环境中只有一个工作人员手动启动/停止并且这些不适用 - 任务应该以允许一切的方式创建这会发生。
一些有用的技巧:
- 使用数据库事务
- 使用锁定
- 将长运行宁的任务拆分成更快的任务
- 如果任务有中间值要保存或者它们很重要(即像某些 api 调用一样不可重现)并且它们在下一步中的处理需要时间 - 考虑拆分成几个链式任务
如果您一次只需要 运行 一个任务实例 - 使用某种 锁定 -在数据库或缓存中创建/更新锁定记录,以便其他人(相同任务)可以检查并知道此任务正在 运行ning 并且只是 return 或等待前一个任务完成。
即recursion_function
也可以是 Periodic Task
。作为周期性任务将确保每个间隔都是 运行,即使前一个任务因任何原因失败(因此无法像在常规非周期性任务中那样再次排队)。通过锁定,您可以确保一次只有一个 运行ning。
check_loop()
:
首先,建议将结果保存在数据库中的一个事务中,以确保在数据库中保存/修改全部或不保存。
您还可以保存一些标记,指示已保存对象的数量/状态,因此以后的任务可以只检查此标记,而不是每个对象。
或者在创建每个元素之前以某种方式检查它是否已经存在于数据库中。
我不会写像上面 Oleg 的优秀 post 那样的文章。答案很简单 - 所有 运行 任务将继续 运行。 purge
是关于队列中的所有任务,等待 Celery 工作人员挑选。
目前我有一批芹菜 运行 django 像这样:
Celery.py:
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
app.control.purge()
sender.add_periodic_task(30.0, check_loop.s())
recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
print("setup_periodic_tasks")
@app.task()
def check_loop():
.....
start = database start number
end = database end number
callling apis in a list from id=start to id=end
create objects
update database(start number = end, end number = end + 3)
....
@app.task()
def recursion_function(default_retry_delay=10):
.....
do some looping
....
#when finished, call itself again
recursion_function.apply_async(countdown=30)
我的目标是每当 celery 文件被编辑时它会重新启动所有任务 - 删除尚未执行的排队任务(我这样做是因为 recursion_function
将再次 运行 本身如果它完成了检查数据库中 table 的每条记录的工作,所以我不用担心它会中途停止)。
check_loop
函数将调用一个 api,该 api 已分页到 return 个对象列表,我会将其与 table 中的记录进行比较,如果匹配则创建另一个模型的新自定义记录
我的问题是,当我清除所有消息时,当前的 运行ning 任务会在中途停止还是会继续 运行ning?因为如果 check_loop
函数在 api 列表中途停止循环,那么它将再次 运行 循环,我将创建我不想要的新的重复记录
示例:
在 check_loop()
的运行任务期间,它在中途创建了对象(在 api 列表中,从元素 id=2 到 id=5),服务器重新启动 -> 运行,现在 check_loop()
运行 从开始(在 api 列表中从元素 id=2 到 id=5)并再次从该列表创建对象(我 100% 不想要)
是这样吗运行?我只需要确认一下
编辑:
https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks
我添加了 app.control.purge()
,因为当我重新启动时,recursion_function
在 setup_periodic_tasks
中再次被调用,而前一个 recursion_function.apply_async(countdown=30)
中的 recursion_function
也会执行,所以它会自己倍增
是,工人将continue execution of currently running task除非工人也重新启动。
此外,Celery Way 是 总是期望 任务是 运行 在并发环境中考虑以下因素:
- 有很多任务运行并发
- 有很多celery worker在执行任务
- 相同的任务可能会再次 运行
- 同一任务的多个实例可能 运行 同时
- 任何任务都可以随时终止
即使您确定在您的环境中只有一个工作人员手动启动/停止并且这些不适用 - 任务应该以允许一切的方式创建这会发生。
一些有用的技巧:
- 使用数据库事务
- 使用锁定
- 将长运行宁的任务拆分成更快的任务
- 如果任务有中间值要保存或者它们很重要(即像某些 api 调用一样不可重现)并且它们在下一步中的处理需要时间 - 考虑拆分成几个链式任务
如果您一次只需要 运行 一个任务实例 - 使用某种 锁定 -在数据库或缓存中创建/更新锁定记录,以便其他人(相同任务)可以检查并知道此任务正在 运行ning 并且只是 return 或等待前一个任务完成。
即recursion_function
也可以是 Periodic Task
。作为周期性任务将确保每个间隔都是 运行,即使前一个任务因任何原因失败(因此无法像在常规非周期性任务中那样再次排队)。通过锁定,您可以确保一次只有一个 运行ning。
check_loop()
:
首先,建议将结果保存在数据库中的一个事务中,以确保在数据库中保存/修改全部或不保存。
您还可以保存一些标记,指示已保存对象的数量/状态,因此以后的任务可以只检查此标记,而不是每个对象。
或者在创建每个元素之前以某种方式检查它是否已经存在于数据库中。
我不会写像上面 Oleg 的优秀 post 那样的文章。答案很简单 - 所有 运行 任务将继续 运行。 purge
是关于队列中的所有任务,等待 Celery 工作人员挑选。