Django-Channels 2 Communicator receive_output() 不起作用
Django-Channels 2 Communicator receive_output() doesn't work
我有一个基本的消费者,它从 websocket 接收消息,然后将这些消息广播到消息中指定的频道。
consumers.py
class BasicConsumer(AsyncJsonWebsocketConsumer):
async def receive_json(self, content, **kwargs):
channel = content['channel']
await self.channel_layer.send(
channel,
{
'type': 'some-message-type',
'body': content,
},
)
然后我想像这样用 pytest 和 pytest-asyncio 测试它
@pytest.mark.asyncio
async def test_basic_consumer():
channel_layer = get_channel_layer()
# connect to consumer
communicator = WebsocketCommunicator(BasicConsumer, "/my-url/")
connected, subprotocol = await communicator.connect()
assert connected
# generate valid channel name
channel_name = await channel_layer.new_channel()
await communicator.send_json_to({
'channel': channel_name,
'data': 'some-data',
})
from_channel_layer = await communicator.receive_output(timeout=5)
assert from_channel_layer == {
'type': 'some-message-type',
'body': {
'channel': channel_name,
'data': 'some-data',
},
}
我的理解是communicator.receive_output()
会获取消费者发送到通道层的事件。但是我收到以下错误:
___________________________ test_basic_consumer ___________________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f901b5a87b8>
timeout = 5
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout(timeout):
> return await self.output_queue.get()
vp3test/lib/python3.7/site-packages/asgiref/testing.py:75:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f901b5a8898 maxsize=0 tasks=1>
async def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> await getter
E concurrent.futures._base.CancelledError
/usr/lib/python3.7/asyncio/queues.py:159: CancelledError
During handling of the above exception, another exception occurred:
@pytest.mark.asyncio
async def test_basic_consumer():
channel_layer = get_channel_layer()
# connect to consumer
communicator = WebsocketCommunicator(BasicConsumer, "/my-url/")
connected, subprotocol = await communicator.connect()
assert connected
# generate valid channel name
channel_name = await channel_layer.new_channel()
await communicator.send_json_to({
'channel': channel_name,
'data': 'some-data',
})
> from_channel_layer = await communicator.receive_output(timeout=5)
test_consumers.py:150:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
vp3test/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output
raise e
vp3test/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output
return await self.output_queue.get()
vp3test/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asgiref.timeout.timeout object at 0x7f901b5a8ba8>
exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type: Type[BaseException]) -> None:
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
vp3test/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
我的测试还需要做一些 setup/configuration 吗?
还是我用的 receive_output()
错了?
我正在使用 InMemoryChannelLayer
进行测试和最新发布的所有版本。
我的问题是我在打电话
from_channel_layer = await communicator.receive_output(timeout=5)
这是在等待消费者通过 websocket 发送的消息。
我可以打电话给
from_channel_layer = await channel_layer.receive(channel_name)
获取消费者通过通道层发送的消息。
我有一个基本的消费者,它从 websocket 接收消息,然后将这些消息广播到消息中指定的频道。
consumers.py
class BasicConsumer(AsyncJsonWebsocketConsumer):
async def receive_json(self, content, **kwargs):
channel = content['channel']
await self.channel_layer.send(
channel,
{
'type': 'some-message-type',
'body': content,
},
)
然后我想像这样用 pytest 和 pytest-asyncio 测试它
@pytest.mark.asyncio
async def test_basic_consumer():
channel_layer = get_channel_layer()
# connect to consumer
communicator = WebsocketCommunicator(BasicConsumer, "/my-url/")
connected, subprotocol = await communicator.connect()
assert connected
# generate valid channel name
channel_name = await channel_layer.new_channel()
await communicator.send_json_to({
'channel': channel_name,
'data': 'some-data',
})
from_channel_layer = await communicator.receive_output(timeout=5)
assert from_channel_layer == {
'type': 'some-message-type',
'body': {
'channel': channel_name,
'data': 'some-data',
},
}
我的理解是communicator.receive_output()
会获取消费者发送到通道层的事件。但是我收到以下错误:
___________________________ test_basic_consumer ___________________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f901b5a87b8>
timeout = 5
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout(timeout):
> return await self.output_queue.get()
vp3test/lib/python3.7/site-packages/asgiref/testing.py:75:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f901b5a8898 maxsize=0 tasks=1>
async def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> await getter
E concurrent.futures._base.CancelledError
/usr/lib/python3.7/asyncio/queues.py:159: CancelledError
During handling of the above exception, another exception occurred:
@pytest.mark.asyncio
async def test_basic_consumer():
channel_layer = get_channel_layer()
# connect to consumer
communicator = WebsocketCommunicator(BasicConsumer, "/my-url/")
connected, subprotocol = await communicator.connect()
assert connected
# generate valid channel name
channel_name = await channel_layer.new_channel()
await communicator.send_json_to({
'channel': channel_name,
'data': 'some-data',
})
> from_channel_layer = await communicator.receive_output(timeout=5)
test_consumers.py:150:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
vp3test/lib/python3.7/site-packages/asgiref/testing.py:86: in receive_output
raise e
vp3test/lib/python3.7/site-packages/asgiref/testing.py:75: in receive_output
return await self.output_queue.get()
vp3test/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asgiref.timeout.timeout object at 0x7f901b5a8ba8>
exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type: Type[BaseException]) -> None:
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
vp3test/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
我的测试还需要做一些 setup/configuration 吗?
还是我用的 receive_output()
错了?
我正在使用 InMemoryChannelLayer
进行测试和最新发布的所有版本。
我的问题是我在打电话
from_channel_layer = await communicator.receive_output(timeout=5)
这是在等待消费者通过 websocket 发送的消息。
我可以打电话给
from_channel_layer = await channel_layer.receive(channel_name)
获取消费者通过通道层发送的消息。