在同步模式下调用基于 Celery class 的任务

Call Celery class-based task in sync mode

我有芹菜class基于任务

from celery import Task
from django.db import transaction

from config import celery_app


class RefreshData(Task):
    name = "refresh-data"

    @transaction.atomic
    def run(self, *args, **kwargs):
        SomeClass().some_fuc()

celery_app.register_task(RefreshData)

现在我想 运行 它处于同步模式(在测试中)。怎么做?

通过将task_always_eager设置为True,任务将立即(同步)执行,而不是发送到队列(异步),允许您调试任务中的代码。

task_always_eager 默认为 False 以帮助防止在生产中无意中激活它。

参考https://docs.celeryq.dev/en/stable/userguide/configuration.html?highlight=task_always_eager#std-setting-task_always_eager

诀窍是mock.patch以我们同步调用的方式调用任务。

同步调用任务RefreshData().run(*args, **kwargs)