在测试我的 Websocket 消费者时,未创建模型对象(Django Channels)

While testing my Websocket Consumer a model object is not created (Django Channels)

我是 Django Channels 的新手,我正在尝试构建一个简单的聊天应用程序。但是当我尝试测试我的异步 Websocket 消费者时,我 运行 出现以下异常:chat.models.RoomModel.DoesNotExist: RoomModel 匹配查询不存在。

好像没有创建测试室

test.py 文件如下:

class ChatTest(TestCase):

    @sync_to_async
    def set_data(self):
        room = RoomModel.objects.create_room('Room1', 'room_password_123')
        room_slug = room.slug
        user = User.objects.create_user(username='User1', password='user_password_123')
        print(RoomModel.objects.all()) # querySet here contains the created room
        return room_slug, user

    async def test_my_consumer(self):
        room_slug, user = await self.set_data()
        application = URLRouter([
            re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
        ])
        communicator = WebsocketCommunicator(application, f'/ws/chat/{room_slug}/')
        communicator.scope['user'] = user
        connected, subprotocol = await communicator.connect()
        self.assertTrue(connected)
        await communicator.send_json_to({'message': 'hello'})
        response = await communicator.receive_json_from()
        self.assertEqual('hello', response)
        await communicator.disconnect()

我的 consumer.py 文件如下:

class ChatConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)

    async def receive(self, text_data):
        self.message = json.loads(text_data).get('message')
        data = {'type': 'chat_message'}
        data.update(await self.create_message())
        await self.channel_layer.group_send(self.room_group_name, data)

    async def chat_message(self, event):
        await self.send(text_data=json.dumps({
            'message': event['message'],
            'user': event['user'],
            'timestamp': event['timestamp']
        }))

    @database_sync_to_async
    def create_message(self):
        print(RoomModel.objects.all()) # qyerySet turns out to be empty here
        room = RoomModel.objects.get(slug=self.room_name)
        msg = room.messagemodel_set.create(text=self.message, user_name=self.scope['user'])
        return {
            'message': msg.text,
            'user': self.scope['user'].username,
            'timestamp': msg.timestamp.strftime("%d/%m/%Y, %H:%M")
        }

如有任何帮助,我将不胜感激。

我遇到了一个类似的问题,我使用 link https://github.com/django/channels/issues/1110 解决了这个问题。总而言之,您必须将 TestCase 更改为 TransactionTestCase

当您尝试 运行 room = RoomModel.objects.create_room('Room1', 'room_password_123') 时,您正在尝试提交事务。

这在测试结束之前不会发生,因为 TestCase 将每个测试包装在一个事务中。它等到测试结束时才创建这个对象。

并且由于您是 awaiting set_data,流程继续执行测试的其余部分,即它到达对 create_message 的调用,它试图获取 RoomModel 对象,它将尚未出现在数据库中,因为事务尚未提交。