在测试我的 Websocket 消费者时,未创建模型对象(Django Channels)
While testing my Websocket Consumer a model object is not created (Django Channels)
我是 Django Channels 的新手,我正在尝试构建一个简单的聊天应用程序。但是当我尝试测试我的异步 Websocket 消费者时,我 运行 出现以下异常:chat.models.RoomModel.DoesNotExist: RoomModel 匹配查询不存在。
好像没有创建测试室
test.py 文件如下:
class ChatTest(TestCase):
@sync_to_async
def set_data(self):
room = RoomModel.objects.create_room('Room1', 'room_password_123')
room_slug = room.slug
user = User.objects.create_user(username='User1', password='user_password_123')
print(RoomModel.objects.all()) # querySet here contains the created room
return room_slug, user
async def test_my_consumer(self):
room_slug, user = await self.set_data()
application = URLRouter([
re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
])
communicator = WebsocketCommunicator(application, f'/ws/chat/{room_slug}/')
communicator.scope['user'] = user
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.send_json_to({'message': 'hello'})
response = await communicator.receive_json_from()
self.assertEqual('hello', response)
await communicator.disconnect()
我的 consumer.py 文件如下:
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
async def receive(self, text_data):
self.message = json.loads(text_data).get('message')
data = {'type': 'chat_message'}
data.update(await self.create_message())
await self.channel_layer.group_send(self.room_group_name, data)
async def chat_message(self, event):
await self.send(text_data=json.dumps({
'message': event['message'],
'user': event['user'],
'timestamp': event['timestamp']
}))
@database_sync_to_async
def create_message(self):
print(RoomModel.objects.all()) # qyerySet turns out to be empty here
room = RoomModel.objects.get(slug=self.room_name)
msg = room.messagemodel_set.create(text=self.message, user_name=self.scope['user'])
return {
'message': msg.text,
'user': self.scope['user'].username,
'timestamp': msg.timestamp.strftime("%d/%m/%Y, %H:%M")
}
如有任何帮助,我将不胜感激。
我遇到了一个类似的问题,我使用 link https://github.com/django/channels/issues/1110 解决了这个问题。总而言之,您必须将 TestCase 更改为 TransactionTestCase
当您尝试 运行 room = RoomModel.objects.create_room('Room1', 'room_password_123')
时,您正在尝试提交事务。
这在测试结束之前不会发生,因为 TestCase 将每个测试包装在一个事务中。它等到测试结束时才创建这个对象。
并且由于您是 awaiting set_data
,流程继续执行测试的其余部分,即它到达对 create_message
的调用,它试图获取 RoomModel 对象,它将尚未出现在数据库中,因为事务尚未提交。
我是 Django Channels 的新手,我正在尝试构建一个简单的聊天应用程序。但是当我尝试测试我的异步 Websocket 消费者时,我 运行 出现以下异常:chat.models.RoomModel.DoesNotExist: RoomModel 匹配查询不存在。
好像没有创建测试室
test.py 文件如下:
class ChatTest(TestCase):
@sync_to_async
def set_data(self):
room = RoomModel.objects.create_room('Room1', 'room_password_123')
room_slug = room.slug
user = User.objects.create_user(username='User1', password='user_password_123')
print(RoomModel.objects.all()) # querySet here contains the created room
return room_slug, user
async def test_my_consumer(self):
room_slug, user = await self.set_data()
application = URLRouter([
re_path(r'ws/chat/(?P<room_name>\w+)/$', ChatConsumer.as_asgi()),
])
communicator = WebsocketCommunicator(application, f'/ws/chat/{room_slug}/')
communicator.scope['user'] = user
connected, subprotocol = await communicator.connect()
self.assertTrue(connected)
await communicator.send_json_to({'message': 'hello'})
response = await communicator.receive_json_from()
self.assertEqual('hello', response)
await communicator.disconnect()
我的 consumer.py 文件如下:
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
async def receive(self, text_data):
self.message = json.loads(text_data).get('message')
data = {'type': 'chat_message'}
data.update(await self.create_message())
await self.channel_layer.group_send(self.room_group_name, data)
async def chat_message(self, event):
await self.send(text_data=json.dumps({
'message': event['message'],
'user': event['user'],
'timestamp': event['timestamp']
}))
@database_sync_to_async
def create_message(self):
print(RoomModel.objects.all()) # qyerySet turns out to be empty here
room = RoomModel.objects.get(slug=self.room_name)
msg = room.messagemodel_set.create(text=self.message, user_name=self.scope['user'])
return {
'message': msg.text,
'user': self.scope['user'].username,
'timestamp': msg.timestamp.strftime("%d/%m/%Y, %H:%M")
}
如有任何帮助,我将不胜感激。
我遇到了一个类似的问题,我使用 link https://github.com/django/channels/issues/1110 解决了这个问题。总而言之,您必须将 TestCase 更改为 TransactionTestCase
当您尝试 运行 room = RoomModel.objects.create_room('Room1', 'room_password_123')
时,您正在尝试提交事务。
这在测试结束之前不会发生,因为 TestCase 将每个测试包装在一个事务中。它等到测试结束时才创建这个对象。
并且由于您是 awaiting set_data
,流程继续执行测试的其余部分,即它到达对 create_message
的调用,它试图获取 RoomModel 对象,它将尚未出现在数据库中,因为事务尚未提交。