如何在 class 中实现 asyncio websockets?

How can I implement asyncio websockets in a class?

我想通过 asynciowebsockets 连接到一个 websocket,格式如下所示。我怎样才能做到这一点?

from websockets import connect


class EchoWebsocket:

    def __init__(self):
        self.websocket = self._connect()

    def _connect(self):
        return connect("wss://echo.websocket.org")

    def send(self, message):
        self.websocket.send(message)

    def receive(self):
        return self.websocket.recv()

echo = EchoWebsocket()
echo.send("Hello!")
print(echo.receive())  # "Hello!"

如何编写异步程序?

  1. 您应该使用 async
  2. 定义异步函数
  3. 你应该调用异步 函数 await
  4. 您需要 event loop 才能启动您的异步程序

所有其他几乎与常规 Python 程序相同。

import asyncio
from websockets import connect


class EchoWebsocket:
    async def __aenter__(self):
        self._conn = connect("wss://echo.websocket.org")
        self.websocket = await self._conn.__aenter__()        
        return self

    async def __aexit__(self, *args, **kwargs):
        await self._conn.__aexit__(*args, **kwargs)

    async def send(self, message):
        await self.websocket.send(message)

    async def receive(self):
        return await self.websocket.recv()


async def main():
    async with EchoWebsocket() as echo:
        await echo.send("Hello!")
        print(await echo.receive())  # "Hello!"


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

Hello!

如你所见,代码和你写的差不多。

唯一的区别是 websockets.connect 设计为异步上下文管理器(它使用 __aenter____aexit__)。释放连接是必要的,也将帮助您在 class 初始化期间进行异步操作(因为我们没有 __init__ 的异步版本)。

我建议您以同样的方式组织您的 class。但是如果出于某种原因你真的不想使用上下文管理器,你可以使用新的 __await__ 方法进行异步初始化和一些其他异步函数来释放连接:

import sys
import asyncio
from websockets import connect


class EchoWebsocket:
    def __await__(self):
        # see: 
        return self._async_init().__await__()

    async def _async_init(self):
        self._conn = connect("wss://echo.websocket.org")
        self.websocket = await self._conn.__aenter__()
        return self

    async def close(self):
        await self._conn.__aexit__(*sys.exc_info())

    async def send(self, message):
        await self.websocket.send(message)

    async def receive(self):
        return await self.websocket.recv()


async def main():
    echo = await EchoWebsocket()
    try:
        await echo.send("Hello!")
        print(await echo.receive())  # "Hello!"
    finally:
        await echo.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

您可以在 docs 中找到许多使用 websockets 的示例。