celery Django task is not working in mock.patch how to get updated value of DB which is being performed in the celery task

我正在开发 django 应用程序,我制作了一个名为 service.py 的服务文件,在该文件中我调用了一个 celery 任务,即

# start celery task in the apps>myapp>tasks.py
def update_user(user_id):
    except Exception as e:
# end celery task in the apps>myapp>tasks.py

# start  test class in which i am calling celery task apps>myapp>service.py
from config.celery import app

class TestClass:
    def call_celery_task(self):
# end test class in which i am calling celery task apps>myapp>service.py

这个任务在我正常执行时运行良好通过代码和命令 TestClass.call_celery_task()

但是这里的测试用例有问题我正在编写测试用例,因为我想调用我的 celery 任务并想获取 celery 任务的数据库的更新值,我正在这样做,即

from unittest import mock
from django.test import TestCase
from django.contrib.auth.models import User

class TestTestClass(TestCase):
    def setUp(self):
        self.user = User.objects.create(username="test",first_name="test")

    def test_call_celery_task(self, mock_app):
        self.assertTrue(mock_app.send_task.called) #this is returning True 
        self.assertEqual(self.user.first_name, "") #this is returning old first name why ?

这里我没有更新 celery 任务完成的名字值。如果这是一个异步调用,那么请建议我在这里获取更新的值,谢谢。

不想在测试用例中手动调用celery任务。我只想使用 mock.patch.

上述测试的问题在于您模拟了 Celery 应用程序。因此,当 app.send_task 被调用时,它并没有执行实际的任务。您甚至可以打印 app.send_task 来查看其值。此外,在 celery 任务中添加 pdb.set_trace() 以验证在 运行 单元测试时未调用该方法。这就是名称的值未更新的原因。

我在这里找到了解决方案,在上面的问题中,芹菜应用程序被嘲笑了,这是错误的,您要做的就是只嘲笑函数中的任务。只需导入芹菜任务 - 从apps.myapp.tasks导入update_user

from django.test import TestCase
from django.contrib.auth.models import User
from apps.myapp.tasks import update_user

class TestTestClass(TestCase):
    def setUp(self):
        self.user = User.objects.create(username="test",first_name="test")

    def test_call_celery_task(self, mock_app):
        update_user.apply(args=(what ever args you have)).get()
        #here you will get updated value of user
