使用 WebSocketCommunicator 在 Django Channels v2 测试中进行身份验证

Authentication in Django Channels v2 tests with WebSocketCommunicator

在为我的聊天用户编写测试的过程中,我遇到了无法在测试中使用 WebSocketCommunicator 进行身份验证的问题。我有自定义 JwtTokenAuthMiddleware,它通过在请求查询中使用令牌在套接字中实现身份验证,因为据我所知,使用授权 headers 进行体面的身份验证尚不可能。你们能否就此向我提出建议或向我提供我在网上找不到的示例代码?顺便说一句,我的聊天没有问题。测试也应该完全没问题,我从官方文档 Django Channels 2.x Testing.

中获取了指南

--JwtTokenAuthMiddlewate--

class JwtTokenAuthMiddleware:
    def __init__(self, inner):
        self.inner = inner

    def __call__(self, scope):
        close_old_connections()
        try:
            raw_token = scope['query_string'].decode().split('=')[1]
            auth = JWTAuthentication()
            validated_token = auth.get_validated_token(raw_token)
            user = auth.get_user(validated_token)
            scope['user'] = user
        except (IndexError, InvalidToken, AuthenticationFailed):
            scope['user'] = AnonymousUser()
        return self.inner(scope)


JwtTokenAuthMiddlewareStack = lambda inner: JwtTokenAuthMiddleware(AuthMiddlewareStack(inner))

--实例测试--

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_trainer_auth_success():
    room = await database_sync_to_async(RoomFactory.create)()
    trainer = room.trainer
    trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
    room_url = f'ws/room/{room.id}/'

    trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
    connected, _ = await trainer_communicator.connect()
    assert connected

    trainer_connect_resp = await trainer_communicator.receive_json_from()
    assert_connection(trainer_connect_resp, [], room.max_round_time)
    await trainer_communicator.disconnect()

--错误回溯--

___________________________________________________________ test_trainer_auth_success ___________________________________________________________

self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f6b9906f290>, timeout = 1

    async def receive_output(self, timeout=1):
        """
        Receives a single message from the application, with optional timeout.
        """
        # Make sure there's not an exception to raise from the task
        if self.future.done():
            self.future.result()
        # Wait and receive the message
        try:
            async with async_timeout(timeout):
>               return await self.output_queue.get()

/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Queue at 0x7f6b98f76510 maxsize=0>

    async def get(self):
        """Remove and return an item from the queue.

        If queue is empty, wait until an item is available.
        """
        while self.empty():
            getter = self._loop.create_future()
            self._getters.append(getter)
            try:
>               await getter
E               concurrent.futures._base.CancelledError

/usr/local/lib/python3.7/asyncio/queues.py:159: CancelledError

During handling of the above exception, another exception occurred:

    @pytest.mark.django_db(transaction=True)
    @pytest.mark.asyncio
    async def test_trainer_auth_success():
        room = await database_sync_to_async(RoomFactory.create)()
        trainer = room.trainer
        trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
        room_url = f'ws/room/{room.id}/'

        # trainer_communicator = await assert_get_connected_communicator(application, room_url, trainer_token)
        trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
>       connected, _ = await trainer_communicator.connect()

apps/chat/tests/test_consumers.py:39: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/site-packages/channels/testing/websocket.py:36: in connect
    response = await self.receive_output(timeout)
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:85: in receive_output
    raise e
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74: in receive_output
    return await self.output_queue.get()
/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
    self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <asgiref.timeout.timeout object at 0x7f6b98f76d50>, exc_type = <class 'concurrent.futures._base.CancelledError'>

    def _do_exit(self, exc_type: Type[BaseException]) -> None:
        if exc_type is asyncio.CancelledError and self._cancelled:
            self._cancel_handler = None
            self._task = None
>           raise asyncio.TimeoutError
E           concurrent.futures._base.TimeoutError

/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
=============================================================== warnings summary ================================================================
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39
  /usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
    item = pytest.Function(name, parent=collector)

/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45
  /usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
    item = pytest.Function(name, parent=collector)  # To reload keywords.

我一直在尝试做或多或少相同的事情,但似乎没有简单的方法可以在测试消费者和传播者时对用户进行身份验证。 There is 关于此主题的 GH 问题,其中提供了一些(可行的!)解决方法,也许您会觉得有用。