如何跨多个服务器/工作者管理 websockets

How to manage websockets across multiple servers / workers

aiohttp has built-in support for websockets。非常简单,效果很好。

文档中示例的简化版本是:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    # Async iterate the messages the client sends
    async for message in ws:
        ws.send_str('You sent: %s' % (message.data,))

    print('websocket connection closed')

在示例中,ws 是对与客户端的 websocket 连接的引用。我可以轻松地将此引用放入 request.app,如 @Crandel does here(即全局状态),但不能放在生产应用程序中,因为每个应用程序服务器(甚至每个工作人员)都有自己的 app实例.

是否有可接受的模式?还有别的办法吗?

注意:我指的不是会话。我指的是连接。当服务器 B 中的应用程序代码等发生事件时,我想向连接到服务器 A 的客户端发送消息。

所以我只熟悉 Node 中的 Socket.IO 但是使用 Socket.IO.

水平扩展 websockets 相当容易

套接字可以带有会话,因此每个会话都由特定的服务器管理。这样可以轻松保存每个打开的套接字的状态,并在所有服务器之间进行负载平衡。

这是 Python 的 SocketIO:

https://pypi.python.org/pypi/socketIO-client

这里有一篇非常好的读物,介绍如何将会话附加到 redis-store 以使其更快,并且跨服务器的负载平衡更易于管理。

How to share sessions with Socket.IO 1.x and Express 4.x?

我知道这不能回答您关于 aiohttp 的问题,但希望这能让您更好地了解套接字如何工作。

编辑: 用 Node-

编写

在 Socket.IO 中这真的很简单,它有大量的功能可以以各种不同的方式广播消息。

例如,如果您想向每个聊天室中的每个人发送一条消息。例如,每个打开了套接字的人都可以轻松编写。

socket.broadcast.emit('WARNING', "this is a test");

假设您有开放的房间,您可以使用名为 .to() 的简单功能仅向该房间内的人广播消息。示例 我有一个名为 'BBQ':

的房间
socket.broadcast.to('BBQ').emit('invitation', 'Come get some food!');

这将在 BBQ 频道中向所有人发送消息 - 来吃点东西吧!

编辑:编辑:

这是一篇关于 Socket.IO 工作原理的精彩文章,请务必阅读函数更新版本的第二个答案。它比他们的文档更容易阅读。

Send response to all clients except sender (Socket.io)

据我所知,这也是 python 实现中的工作原理。为了便于使用,我肯定会将它用于 websockets。 aiohttp看起来真的很强大,但要么没有这个功能,要么埋在文档中,要么只写在代码中,还没有任何文档。

更新(2017 年 2 月)

频道(幸运的是)没有合并到 Django 中。它可能仍然是一个伟大的项目,但它并不真正属于 Django 本身。

此外,我强烈建议您查看 Postgres 相对较新的内置支持 pub/sub。它将 probably outperform anything else,并在 aiohttp 之上构建自定义解决方案,使用 Postgres 作为支持服务,可能是您最好的选择。

原创

虽然不是aiohttp,Django Channels, which is likely to be merged into Django 1.10, solves this problem in a very intuitive way, and it's written by Andrew Godwin, the author of Django migrations

Django Channels 通过在 Django 应用程序前面创建路由层来抽象 "many processes on many servers" 的概念。该路由层与后端(例如 Redis)对话,以在进程之间维护可共享状态,并使用新的 ASGI protocol to facilitate handling both HTTP requests and WebSockets, while delegating each to their respective "consumers”(例如,附带一个用于 HTTP 请求的内置处理程序,您可以编写自己的对于 WebSockets)。

Django Channels 有一个叫做 Groups 的概念,它处理问题的 "broadcast" 性质;也就是说,它允许服务器上发生的事件向该组中的客户端触发消息,而不管它们是否连接到相同或不同的进程或服务器。

恕我直言,Django Channels 很可能被抽象成一个更通用的 Python 库。有一个 couple other Python libraries that achieve Go-like Channels,但是在撰写本文时,没有什么值得注意的东西可以提供网络透明性;通道在进程和服务器之间进行通信的能力。

如果我对你的理解是正确的,你希望有多个 websocket 服务器,每个服务器都连接了多个客户端,但你希望能够与所有连接的客户端进行潜在通信。

这是一个示例,它创建了三个简单的服务器——一个大写回显、一个随机报价和一天中的时间——然后向所有连接的客户端发送广播消息。也许这里面有一些有用的想法。

粘贴站:https://pastebin.com/xDSACmdV

#!/usr/bin/env python3
"""
Illustrates how to have multiple websocket servers running and send
messages to all their various clients at once.

In response to Whosebug question:


Pastebin: https://pastebin.com/xDSACmdV
"""
import asyncio
import datetime
import random
import time
import webbrowser

import aiohttp
from aiohttp import web

__author__ = "Robert Harder"
__email__ = "rob@iharder.net"
__license__ = "Public Domain"


def main():
    # Create servers
    cap_srv = CapitalizeEchoServer(port=9990)
    rnd_srv = RandomQuoteServer(port=9991)
    tim_srv = TimeOfDayServer(port=9992)

    # Queue their start operation
    loop = asyncio.get_event_loop()
    loop.create_task(cap_srv.start())
    loop.create_task(rnd_srv.start())
    loop.create_task(tim_srv.start())

    # Open web pages to test them
    webtests = [9990, 9991, 9991, 9992, 9992]
    for port in webtests:
        url = "http://www.websocket.org/echo.html?location=ws://localhost:{}".format(port)
        webbrowser.open(url)
    print("Be sure to click 'Connect' on the webpages that just opened.")

    # Queue a simulated broadcast-to-all message
    def _alert_all(msg):
        print("Sending alert:", msg)
        msg_dict = {"alert": msg}
        cap_srv.broadcast_message(msg_dict)
        rnd_srv.broadcast_message(msg_dict)
        tim_srv.broadcast_message(msg_dict)

    loop.call_later(17, _alert_all, "ALL YOUR BASE ARE BELONG TO US")

    # Run event loop
    loop.run_forever()


class MyServer:
    def __init__(self, port):
        self.port = port  # type: int
        self.loop = None  # type: asyncio.AbstractEventLoop
        self.app = None  # type: web.Application
        self.srv = None  # type: asyncio.base_events.Server

    async def start(self):
        self.loop = asyncio.get_event_loop()
        self.app = web.Application()
        self.app["websockets"] = []  # type: [web.WebSocketResponse]
        self.app.router.add_get("/", self._websocket_handler)
        await self.app.startup()
        handler = self.app.make_handler()
        self.srv = await asyncio.get_event_loop().create_server(handler, port=self.port)
        print("{} listening on port {}".format(self.__class__.__name__, self.port))

    async def close(self):
        assert self.loop is asyncio.get_event_loop()
        self.srv.close()
        await self.srv.wait_closed()

        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')

        await self.app.shutdown()
        await self.app.cleanup()

    async def _websocket_handler(self, request):
        assert self.loop is asyncio.get_event_loop()
        ws = web.WebSocketResponse()
        await ws.prepare(request)
        self.app["websockets"].append(ws)

        await self.do_websocket(ws)

        self.app["websockets"].remove(ws)
        return ws

    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            pass

    def broadcast_message(self, msg: dict):
        for ws in self.app["websockets"]:  # type: web.WebSocketResponse
            ws.send_json(msg)


class CapitalizeEchoServer(MyServer):
    """ Echoes back to client whatever they sent, but capitalized. """

    async def do_websocket(self, ws: web.WebSocketResponse):
        async for ws_msg in ws:  # type: aiohttp.WSMessage
            cap = ws_msg.data.upper()
            ws.send_str(cap)


class RandomQuoteServer(MyServer):
    """ Sends a random quote to the client every so many seconds. """
    QUOTES = ["Wherever you go, there you are.",
              "80% of all statistics are made up.",
              "If a tree falls in the woods, and no one is around to hear it, does it make a noise?"]

    def __init__(self, interval: float = 10, *kargs, **kwargs):
        super().__init__(*kargs, **kwargs)
        self.interval = interval

    async def do_websocket(self, ws: web.WebSocketResponse):
        async def _regular_interval():
            while self.srv.sockets is not None:
                quote = random.choice(RandomQuoteServer.QUOTES)
                ws.send_json({"quote": quote})
                await asyncio.sleep(self.interval)

        self.loop.create_task(_regular_interval())

        await super().do_websocket(ws)  # leave client connected here indefinitely


class TimeOfDayServer(MyServer):
    """ Sends a message to all clients simultaneously about time of day. """

    async def start(self):
        await super().start()

        async def _regular_interval():
            while self.srv.sockets is not None:
                if int(time.time()) % 10 == 0:  # Only on the 10 second mark
                    timestamp = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
                    self.broadcast_message({"timestamp": timestamp})
                await asyncio.sleep(1)

        self.loop.create_task(_regular_interval())


if __name__ == "__main__":
    main()