如何使用 transaction.on_commit 编写 Celery 组或链

How to write a Celery group or chain using transaction.on_commit

我正在使用 django 3.0.2、python 3.6.6、celery 4.3.0 和 redis 服务器 4.0.9。我想创建一些任务链和任务组,在模型保存后 运行 (transaction.on_commit)。我可以让单个任务以这种方式工作,但我似乎无法想出正确的咒语来使一个组或链使用 transaction.on_commit。

边栏:我在使用 transaction.on_commit 时将 celery 任务 class 扩展到 "hide" lamba 东西,因为我总是必须查找正确的格式 - 请参阅 https://browniebroke.com/making-celery-work-nicely-with-django-transactions / 了解详情。在我的代码中,我使用 task.delay_on_commit 替换 transaction.on_commit 和 lambda.

回到正文。我试过这段代码,它说我的任务中的对象查询不存在。

jobs = group(tasks.clean_document_image.delay_on_commit(self.document_id, key, values[key]) for key in values).apply_async()

clean_document_image 获取上传的图像(document_id 对象的 FileField)并创建多个不同大小的图像副本(values 字典中的键是副本的宽度,并且值字典中的值只是大小的字符串名称——例如,thumb、xsmall、xxxlarge 等)。请注意,我试图延迟组中的每个元素,直到完成保存 FileField 的事务。

当我 运行 这个 "emulation" 组时,它按预期工作,具有 FileField 的对象已保存,并且具有 document_i 的查找有效。

for key in values:
    tasks.clean_document_image.delay_on_commit(self.document_id, key, values[key])

创建一组直到保存 django 对象才开始的任务的正确方法是什么?换句话说,如何将 transaction.on_commit 合并到一组任务中?

谢谢!

马克

我发现这行得通:

from django.db import transaction
transaction.on_commit(lambda: chain(task1.si(args), 
                                    task2.si(args), 
                                    task3.si(args)).delay())

.si 适用于没有 return 值作为链中下一个任务输入的任务。如果你想让 task1 的输出被 task2 使用,那么使用 task1.s(args)。 task2 的第一个参数必须是 task1 的输出值。