Testing channels 2.0 with pytest-asyncio getting RuntimeError: Event loop is closed

Testing channels 2.0 with pytest-asyncio getting RuntimeError: Event loop is closed

我正在尝试使用 pytest-asyncio (0.8.0) 测试新的 channels 2.0。如果我在同一个函数中放置不同的断言,例如:

import json
import pytest
from concurrent.futures._base import TimeoutError
from channels.testing import WebsocketCommunicator
from someapp.consumers import MyConsumer


@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
    path = 'foo'
    communicator = WebsocketCommunicator(MyConsumer, path)
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1_and_2():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

那么我没有收到错误。但是如果我像这样分开测试用例:

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_2():
    communicator = await setup_database_and_websocket()
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })

然后我在第二次 receive_form 调用时收到以下错误:

with pytest.raises(TimeoutError):
>           await communicator.receive_from()

someapp/tests/test_consumers_async.py:106: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/generic/websocket.py:19: in websocket_connect
self.connect()
someapp/consumers.py:22: in connect
self.group_name, self.channel_name)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:49: in __call__
return call_result.result()
/usr/lib/python3.6/concurrent/futures/_base.py:432: in result
return self.__get_result()
/usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
raise self._exception
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:63: in main_wrap
result = await self.awaitable(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels_redis/core.py:290: in group_add
await connection.expire(group_key, self.group_expiry)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/commands/__init__.py:152: in __exit__
self._release_callback(conn)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/pool.py:361: in release
conn.close()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:352: in close
self._do_close(ConnectionForcedCloseError())
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:359: in _do_close
self._writer.transport.close()
/usr/lib/python3.6/asyncio/selector_events.py:621: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.6/asyncio/base_events.py:574: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

def _check_closed(self):
if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

/usr/lib/python3.6/asyncio/base_events.py:357: RuntimeError

如果我这样做(如 https://channels.readthedocs.io/en/latest/topics/testing.html):

await communicator.disconnect()

而不是:

await communicator.send_input({
    "type": "websocket.disconnect",
    "code": 1000,
})

然后出现以下错误:

>       await communicator.disconnect()

someapp/tests/test_consumers_async.py:96: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:100: in disconnect
    await self.future
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
    await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
    return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
    return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
    result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
    return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
    handler(message)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <someapp.consumers.ChatConsumer object at 0x7f38fcc55240>
message = {'code': 1000, 'type': 'websocket.disconnect'}

    def websocket_disconnect(self, message):
        """
            Called when a WebSocket connection is closed. Base level so you don't
            need to call super() all the time.
            """
        # TODO: group leaving
>       self.disconnect(message["code"])
E       TypeError: disconnect() takes 1 positional argument but 2 were given

我应该怎么做才能在各个单独的测试函数中分离这些测试用例?


编辑:断开连接错误是一个微不足道的错误 - 我忘记在子类方法中添加位置参数 (code):

def disconnect(self, code):
    AsyncToSync(self.channel_layer.group_discard)('foo', self.channel_name)

Edit2:它 was a bug 与 Redis 通道有关 - 错误修复在 github 上的最新 channelsasgiref master 分支中。

这有帮助吗? (虽然我没有测试它。)

@pytest.fixture
async def communicator(db):
    path = 'foo'
    communicator = WebsocketCommunicator(MyConsumer, path)
    connected, subprotocol = await communicator.connect()
    assert connected
    return communicator


@pytest.mark.asyncio
async def test_1(communicator):
    sent = json.dumps({"message": 'abc'})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })


@pytest.mark.asyncio
async def test_2(communicator):
    sent = json.dumps({"message": 1})
    await communicator.send_to(text_data=sent)
    with pytest.raises(TimeoutError):
        await communicator.receive_from()
    await communicator.send_input({
        "type": "websocket.disconnect",
        "code": 1000,
    })