在同步模式下调用基于 Celery class 的任务
Call Celery class-based task in sync mode
我有芹菜class基于任务
from celery import Task
from django.db import transaction
from config import celery_app
class RefreshData(Task):
name = "refresh-data"
@transaction.atomic
def run(self, *args, **kwargs):
SomeClass().some_fuc()
celery_app.register_task(RefreshData)
现在我想 运行 它处于同步模式(在测试中)。怎么做?
通过将task_always_eager设置为True,任务将立即(同步)执行,而不是发送到队列(异步),允许您调试任务中的代码。
task_always_eager 默认为 False 以帮助防止在生产中无意中激活它。
诀窍是mock.patch
以我们同步调用的方式调用任务。
同步调用任务RefreshData().run(*args, **kwargs)
我有芹菜class基于任务
from celery import Task
from django.db import transaction
from config import celery_app
class RefreshData(Task):
name = "refresh-data"
@transaction.atomic
def run(self, *args, **kwargs):
SomeClass().some_fuc()
celery_app.register_task(RefreshData)
现在我想 运行 它处于同步模式(在测试中)。怎么做?
通过将task_always_eager设置为True,任务将立即(同步)执行,而不是发送到队列(异步),允许您调试任务中的代码。
task_always_eager 默认为 False 以帮助防止在生产中无意中激活它。
诀窍是mock.patch
以我们同步调用的方式调用任务。
同步调用任务RefreshData().run(*args, **kwargs)