如何使用 transaction.on_commit 编写 Celery 组或链
How to write a Celery group or chain using transaction.on_commit
我正在使用 django 3.0.2、python 3.6.6、celery 4.3.0 和 redis 服务器 4.0.9。我想创建一些任务链和任务组,在模型保存后 运行 (transaction.on_commit)。我可以让单个任务以这种方式工作,但我似乎无法想出正确的咒语来使一个组或链使用 transaction.on_commit。
边栏:我在使用 transaction.on_commit 时将 celery 任务 class 扩展到 "hide" lamba 东西,因为我总是必须查找正确的格式 - 请参阅 https://browniebroke.com/making-celery-work-nicely-with-django-transactions / 了解详情。在我的代码中,我使用 task.delay_on_commit 替换 transaction.on_commit 和 lambda.
回到正文。我试过这段代码,它说我的任务中的对象查询不存在。
jobs = group(tasks.clean_document_image.delay_on_commit(self.document_id, key, values[key]) for key in values).apply_async()
clean_document_image 获取上传的图像(document_id 对象的 FileField)并创建多个不同大小的图像副本(values 字典中的键是副本的宽度,并且值字典中的值只是大小的字符串名称——例如,thumb、xsmall、xxxlarge 等)。请注意,我试图延迟组中的每个元素,直到完成保存 FileField 的事务。
当我 运行 这个 "emulation" 组时,它按预期工作,具有 FileField 的对象已保存,并且具有 document_i 的查找有效。
for key in values:
tasks.clean_document_image.delay_on_commit(self.document_id, key, values[key])
创建一组直到保存 django 对象才开始的任务的正确方法是什么?换句话说,如何将 transaction.on_commit 合并到一组任务中?
谢谢!
马克
我发现这行得通:
from django.db import transaction
transaction.on_commit(lambda: chain(task1.si(args),
task2.si(args),
task3.si(args)).delay())
.si 适用于没有 return 值作为链中下一个任务输入的任务。如果你想让 task1 的输出被 task2 使用,那么使用 task1.s(args)。 task2 的第一个参数必须是 task1 的输出值。
我正在使用 django 3.0.2、python 3.6.6、celery 4.3.0 和 redis 服务器 4.0.9。我想创建一些任务链和任务组,在模型保存后 运行 (transaction.on_commit)。我可以让单个任务以这种方式工作,但我似乎无法想出正确的咒语来使一个组或链使用 transaction.on_commit。
边栏:我在使用 transaction.on_commit 时将 celery 任务 class 扩展到 "hide" lamba 东西,因为我总是必须查找正确的格式 - 请参阅 https://browniebroke.com/making-celery-work-nicely-with-django-transactions / 了解详情。在我的代码中,我使用 task.delay_on_commit 替换 transaction.on_commit 和 lambda.
回到正文。我试过这段代码,它说我的任务中的对象查询不存在。
jobs = group(tasks.clean_document_image.delay_on_commit(self.document_id, key, values[key]) for key in values).apply_async()
clean_document_image 获取上传的图像(document_id 对象的 FileField)并创建多个不同大小的图像副本(values 字典中的键是副本的宽度,并且值字典中的值只是大小的字符串名称——例如,thumb、xsmall、xxxlarge 等)。请注意,我试图延迟组中的每个元素,直到完成保存 FileField 的事务。
当我 运行 这个 "emulation" 组时,它按预期工作,具有 FileField 的对象已保存,并且具有 document_i 的查找有效。
for key in values:
tasks.clean_document_image.delay_on_commit(self.document_id, key, values[key])
创建一组直到保存 django 对象才开始的任务的正确方法是什么?换句话说,如何将 transaction.on_commit 合并到一组任务中?
谢谢!
马克
我发现这行得通:
from django.db import transaction
transaction.on_commit(lambda: chain(task1.si(args),
task2.si(args),
task3.si(args)).delay())
.si 适用于没有 return 值作为链中下一个任务输入的任务。如果你想让 task1 的输出被 task2 使用,那么使用 task1.s(args)。 task2 的第一个参数必须是 task1 的输出值。