使用 Tornado 广播消息

Broadcasting a message using Tornado

我有一个发送消息的应用程序,应该广播给当前连接的每个客户端。假设客户数量可能达到数千。如何在不阻塞的情况下实现这种广播逻辑?

我认为会阻塞的示例代码:

clients = []

class Broadcaster(tornado.websocket.WebSocketHandler):
   def on_message(self, message):
       for client in clients:
           self.write_message(message)

我在网上找到的每个例子都像上面的代码。有一些使用 @gen.coroutine 的例子,但我不明白这个装饰器在这种情况下如何提供帮助。

如果这条消息能像您所说的那样发送到数千人,您就可以避免此 for 循环。我建议您使用 celery 等任务服务来传递消息。 然后它就像:

clients = []

class Broadcaster(tornado.websocket.WebSocketHandler):
   def on_message(self, message):
       for client in clients:
           celery_task(message)

并且celery_task()方法将是一个芹菜任务方法

@app.task
def celery_task(message):
    //logic

然后一个任务将被添加到队列中,每个任务都会将您想要的消息发送给您的客户。 希望对您有所帮助。

WebSocketHandler.write_message 不会在网络 I/O 上阻塞,因此它会非常快,但如果您有大量客户端,它仍然可以加起来。我建议做一些性能测试,看看广播一条消息实际需要多长时间(记住消息的大小也很重要,写字节串比写字典快得多,因为字典将被重新每次编码为 json)。如果结果花费的时间超过了您在应用程序中可以容忍的阻塞,请定期向循环中添加一个 yield gen.moment

@gen.coroutine
def broadcast_message(self, message):
    count = 0
    for client in self.clients:
        client.write_message(message)
        count += 1
        if count % 100 == 0:
            yield gen.moment