如何在 Django 频道中为消费者编写单元测试?
How to write unittests for Consumers in django channels?
我是 django-channels
的初学者。我正在使用 channels 2.3.1
。我有以下代码。
主路由
websocket_urlpatterns = [
url(r'^ws/events/(?P<room_type>[^/]+)/(?P<room_id>[^/]+)/$', consumers.Consumer, name='core-consumer'),
]
应用路由
application = ProtocolTypeRouter({
'websocket': URLRouter(core.routing.websocket_urlpatterns),
})
消费者
class Consumer(AsyncWebsocketConsumer):
async def connect(self):
room_type = self.scope['url_route']['kwargs']['room_type']
room_id = self.scope['url_route']['kwargs']['room_id']
self.room_group_name = f'{room_type}_{room_id}'
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
async def receive(self, text_data = None, bytes_data = None):
text_data_json = json.loads(text_data)
message = text_data_json['message']
await self.channel_layer.group_send(self.room_group_name,
{'type': 'send_data', 'message': message}
)
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
async def send_data(self, event):
message = event['message']
await self.send(text_data=json.dumps({
'data': message
}))
def send_event(channel='', message=''):
channel_layer = get_channel_layer()
asyncio.run(channel_layer.group_send(channel,
{
'type': 'send_data',
'message': message
}
))
信号
def activate_notification(self):
send_event(f'user_{self.id}',
{'message': f"Welcome back to one deeds!"})
如何使用 unittest
测试 consumer
中的每个函数?
我读了这个 documentation 但没看懂(
我想 运行 使用命令 ./manage.py test
进行测试
我尝试使用 WebsocketCommunicator
进行测试
我会提出一些建议。
1) 使用channels.layers.InMemoryChannelLayer
层。
2) 使用pytest-asyncio
这样你的测试代码就可以是async
方法
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_get():
....
到运行这些测试你会用pytest
3) 使用WebsocketCommunicator
检查这些examples。
我是 django-channels
的初学者。我正在使用 channels 2.3.1
。我有以下代码。
主路由
websocket_urlpatterns = [
url(r'^ws/events/(?P<room_type>[^/]+)/(?P<room_id>[^/]+)/$', consumers.Consumer, name='core-consumer'),
]
应用路由
application = ProtocolTypeRouter({
'websocket': URLRouter(core.routing.websocket_urlpatterns),
})
消费者
class Consumer(AsyncWebsocketConsumer):
async def connect(self):
room_type = self.scope['url_route']['kwargs']['room_type']
room_id = self.scope['url_route']['kwargs']['room_id']
self.room_group_name = f'{room_type}_{room_id}'
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
async def receive(self, text_data = None, bytes_data = None):
text_data_json = json.loads(text_data)
message = text_data_json['message']
await self.channel_layer.group_send(self.room_group_name,
{'type': 'send_data', 'message': message}
)
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
async def send_data(self, event):
message = event['message']
await self.send(text_data=json.dumps({
'data': message
}))
def send_event(channel='', message=''):
channel_layer = get_channel_layer()
asyncio.run(channel_layer.group_send(channel,
{
'type': 'send_data',
'message': message
}
))
信号
def activate_notification(self):
send_event(f'user_{self.id}',
{'message': f"Welcome back to one deeds!"})
如何使用 unittest
测试 consumer
中的每个函数?
我读了这个 documentation 但没看懂(
我想 运行 使用命令 ./manage.py test
我尝试使用 WebsocketCommunicator
我会提出一些建议。
1) 使用channels.layers.InMemoryChannelLayer
层。
2) 使用pytest-asyncio
这样你的测试代码就可以是async
方法
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_get():
....
到运行这些测试你会用pytest
3) 使用WebsocketCommunicator
检查这些examples。