一次扭曲许多内联回调

Twisted many inlineCallbacks at once

简单介绍一下我的情况: 我正在编写一个服务器(twisted powered)来处理与多个客户端(超过 1000 个)的 WebSocket 连接。从服务器发送到客户端的消息在该流程中通过 Redis pub/subinterface 处理(因为消息可以通过 REST 应用):

现在,当客户端连接并注册时,我正在为每个客户端启动 inlineCallback 以扫过队列,如下所示:

    @inlineCallbacks
    def client_queue_handler(self, uuid):
        queue = self.send_queue[uuid]
        client = self.get_client(uuid)
        while True:
            uniqueID = yield queue.get()          
            client_context = self.redis_handler.get_single(uuid)
            msg_context = next(iter([msg
                                for msg in client_context
                                if msg['unique'] == unique]),
                                    None)

            client.sendMessage(msg_context)

正如我之前所说,许多客户端可能会连接。这完全没问题,每个客户端都有自己的 inlineCallback 来执行无限循环吗?据我所知,twisted 有可定制的线程池限制。如果客户端(inlineCallbacks)多于线程池中的线程,会发生什么情况?请问queue.get() block/sleep 那"virtual thread" 又把控制权交给另一个?也许一个 "global" 席卷所有客户端的线程是更好的选择?

inlineCallbacks 不会启动任何 OS 线程。它只是使用 Deferred 的不同界面。 Deferred 只是处理回调的 API。

queue.get()returns一个Deferred。当您放弃它时,inlineCallbacks 会在内部向它添加一个回调,您的函数将保持暂停状态。当回调触发时,inlineCallbacks 使用传递给回调的值恢复您的函数 - 这是您产生的 Deferred 的 "result"。

所发生的一切是正在创建一些 Deferred 对象并向它们添加一些回调。在你的 redis 客户端实现中的某个地方,一些事件源是 "firing" Deferred,其结果是开始调用其回调的过程。

您可以拥有任意多个: * 因为你有系统内存可以保存 * 因为 redis 客户端可以一次跟踪

不知道你的redis客户端具体是怎么实现的。如果它必须为每个队列打开一个套接字,那么您可能会受到可以打开的文件描述符数量或系统可以支持的套接字数量的限制。这些数字将以数万计,当您遇到它们时,您可以部署一些技巧来进一步提高限制。

如果它不必为每个队列打开一个套接字(例如,如果它可以通过单个套接字为所有队列复用通知)那么它可能有一个非常高的限制(可能由其实现中最慢部分的算法复杂度)。