Testing channels 2.0 with pytest-asyncio getting RuntimeError: Event loop is closed
Testing channels 2.0 with pytest-asyncio getting RuntimeError: Event loop is closed
我正在尝试使用 pytest-asyncio
(0.8.0) 测试新的 channels
2.0。如果我在同一个函数中放置不同的断言,例如:
import json
import pytest
from concurrent.futures._base import TimeoutError
from channels.testing import WebsocketCommunicator
from someapp.consumers import MyConsumer
@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
path = 'foo'
communicator = WebsocketCommunicator(MyConsumer, path)
connected, subprotocol = await communicator.connect()
assert connected
return communicator
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1_and_2():
communicator = await setup_database_and_websocket()
sent = json.dumps({"message": 'abc'})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
communicator = await setup_database_and_websocket()
sent = json.dumps({"message": 1})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
那么我没有收到错误。但是如果我像这样分开测试用例:
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1():
communicator = await setup_database_and_websocket()
sent = json.dumps({"message": 'abc'})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_2():
communicator = await setup_database_and_websocket()
sent = json.dumps({"message": 1})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
然后我在第二次 receive_form
调用时收到以下错误:
with pytest.raises(TimeoutError):
> await communicator.receive_from()
someapp/tests/test_consumers_async.py:106:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/generic/websocket.py:19: in websocket_connect
self.connect()
someapp/consumers.py:22: in connect
self.group_name, self.channel_name)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:49: in __call__
return call_result.result()
/usr/lib/python3.6/concurrent/futures/_base.py:432: in result
return self.__get_result()
/usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
raise self._exception
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:63: in main_wrap
result = await self.awaitable(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels_redis/core.py:290: in group_add
await connection.expire(group_key, self.group_expiry)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/commands/__init__.py:152: in __exit__
self._release_callback(conn)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/pool.py:361: in release
conn.close()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:352: in close
self._do_close(ConnectionForcedCloseError())
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:359: in _do_close
self._writer.transport.close()
/usr/lib/python3.6/asyncio/selector_events.py:621: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.6/asyncio/base_events.py:574: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=False>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
/usr/lib/python3.6/asyncio/base_events.py:357: RuntimeError
如果我这样做(如 https://channels.readthedocs.io/en/latest/topics/testing.html):
await communicator.disconnect()
而不是:
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
然后出现以下错误:
> await communicator.disconnect()
someapp/tests/test_consumers_async.py:96:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:100: in disconnect
await self.future
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <someapp.consumers.ChatConsumer object at 0x7f38fcc55240>
message = {'code': 1000, 'type': 'websocket.disconnect'}
def websocket_disconnect(self, message):
"""
Called when a WebSocket connection is closed. Base level so you don't
need to call super() all the time.
"""
# TODO: group leaving
> self.disconnect(message["code"])
E TypeError: disconnect() takes 1 positional argument but 2 were given
我应该怎么做才能在各个单独的测试函数中分离这些测试用例?
编辑:断开连接错误是一个微不足道的错误 - 我忘记在子类方法中添加位置参数 (code
):
def disconnect(self, code):
AsyncToSync(self.channel_layer.group_discard)('foo', self.channel_name)
Edit2:它 was a bug 与 Redis 通道有关 - 错误修复在 github 上的最新 channels
和 asgiref
master 分支中。
这有帮助吗? (虽然我没有测试它。)
@pytest.fixture
async def communicator(db):
path = 'foo'
communicator = WebsocketCommunicator(MyConsumer, path)
connected, subprotocol = await communicator.connect()
assert connected
return communicator
@pytest.mark.asyncio
async def test_1(communicator):
sent = json.dumps({"message": 'abc'})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
@pytest.mark.asyncio
async def test_2(communicator):
sent = json.dumps({"message": 1})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
我正在尝试使用 pytest-asyncio
(0.8.0) 测试新的 channels
2.0。如果我在同一个函数中放置不同的断言,例如:
import json
import pytest
from concurrent.futures._base import TimeoutError
from channels.testing import WebsocketCommunicator
from someapp.consumers import MyConsumer
@pytest.mark.django_db
@pytest.mark.asyncio
async def setup_database_and_websocket():
path = 'foo'
communicator = WebsocketCommunicator(MyConsumer, path)
connected, subprotocol = await communicator.connect()
assert connected
return communicator
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1_and_2():
communicator = await setup_database_and_websocket()
sent = json.dumps({"message": 'abc'})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
communicator = await setup_database_and_websocket()
sent = json.dumps({"message": 1})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
那么我没有收到错误。但是如果我像这样分开测试用例:
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_1():
communicator = await setup_database_and_websocket()
sent = json.dumps({"message": 'abc'})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
@pytest.mark.django_db
@pytest.mark.asyncio
async def test_2():
communicator = await setup_database_and_websocket()
sent = json.dumps({"message": 1})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
然后我在第二次 receive_form
调用时收到以下错误:
with pytest.raises(TimeoutError):
> await communicator.receive_from()
someapp/tests/test_consumers_async.py:106:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:71: in receive_from
response = await self.receive_output(timeout)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/testing.py:66: in receive_output
self.future.result()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/generic/websocket.py:19: in websocket_connect
self.connect()
someapp/consumers.py:22: in connect
self.group_name, self.channel_name)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:49: in __call__
return call_result.result()
/usr/lib/python3.6/concurrent/futures/_base.py:432: in result
return self.__get_result()
/usr/lib/python3.6/concurrent/futures/_base.py:384: in __get_result
raise self._exception
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:63: in main_wrap
result = await self.awaitable(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels_redis/core.py:290: in group_add
await connection.expire(group_key, self.group_expiry)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/commands/__init__.py:152: in __exit__
self._release_callback(conn)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/pool.py:361: in release
conn.close()
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:352: in close
self._do_close(ConnectionForcedCloseError())
../../../.virtualenvs/some_env/lib/python3.6/site-packages/aioredis/connection.py:359: in _do_close
self._writer.transport.close()
/usr/lib/python3.6/asyncio/selector_events.py:621: in close
self._loop.call_soon(self._call_connection_lost, None)
/usr/lib/python3.6/asyncio/base_events.py:574: in call_soon
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=False>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
/usr/lib/python3.6/asyncio/base_events.py:357: RuntimeError
如果我这样做(如 https://channels.readthedocs.io/en/latest/topics/testing.html):
await communicator.disconnect()
而不是:
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
然后出现以下错误:
> await communicator.disconnect()
someapp/tests/test_consumers_async.py:96:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/testing/websocket.py:100: in disconnect
await self.future
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:54: in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/utils.py:48: in await_many_dispatch
await dispatch(result)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:95: in __call__
return await asyncio.wait_for(future, timeout=None)
/usr/lib/python3.6/asyncio/tasks.py:339: in wait_for
return (yield from fut)
/usr/lib/python3.6/concurrent/futures/thread.py:56: in run
result = self.fn(*self.args, **self.kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/db.py:13: in thread_handler
return super().thread_handler(loop, *args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/asgiref/sync.py:110: in thread_handler
return self.func(*args, **kwargs)
../../../.virtualenvs/some_env/lib/python3.6/site-packages/channels/consumer.py:99: in dispatch
handler(message)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <someapp.consumers.ChatConsumer object at 0x7f38fcc55240>
message = {'code': 1000, 'type': 'websocket.disconnect'}
def websocket_disconnect(self, message):
"""
Called when a WebSocket connection is closed. Base level so you don't
need to call super() all the time.
"""
# TODO: group leaving
> self.disconnect(message["code"])
E TypeError: disconnect() takes 1 positional argument but 2 were given
我应该怎么做才能在各个单独的测试函数中分离这些测试用例?
编辑:断开连接错误是一个微不足道的错误 - 我忘记在子类方法中添加位置参数 (code
):
def disconnect(self, code):
AsyncToSync(self.channel_layer.group_discard)('foo', self.channel_name)
Edit2:它 was a bug 与 Redis 通道有关 - 错误修复在 github 上的最新 channels
和 asgiref
master 分支中。
这有帮助吗? (虽然我没有测试它。)
@pytest.fixture
async def communicator(db):
path = 'foo'
communicator = WebsocketCommunicator(MyConsumer, path)
connected, subprotocol = await communicator.connect()
assert connected
return communicator
@pytest.mark.asyncio
async def test_1(communicator):
sent = json.dumps({"message": 'abc'})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})
@pytest.mark.asyncio
async def test_2(communicator):
sent = json.dumps({"message": 1})
await communicator.send_to(text_data=sent)
with pytest.raises(TimeoutError):
await communicator.receive_from()
await communicator.send_input({
"type": "websocket.disconnect",
"code": 1000,
})