如何限制 Twisted 中的同时连接数
How to limit the number of simultaneous connections in Twisted
所以我构建了一个扭曲的服务器,我想知道限制同时连接数的最佳方法是什么?
我的工厂 return None 是最好的方法吗?当我这样做时,我会抛出很多异常,例如:
exceptions.AttributeError: 'NoneType' object has no attribute 'makeConnection'
我希望以某种方式让客户端排队等待,直到当前连接数下降,但我不知道如何异步执行此操作。
目前我的工厂是这样使用的:
class HandleClientFactory(Factory):
def __init__(self):
self.numConnections = 0
def buildProtocol(self, addr):
#limit connection number here
if self.numConnections >= Max_Clients:
logging.warning("Reached maximum Client connections")
return None
return HandleClient(self)
这有效,但会断开连接而不是等待,并且还会抛出很多未处理的错误。
你必须自己构建它。幸运的是,这些作品大部分都已到位(您可能会要求稍微更合适的作品,但是......)
首先,要避免 AttributeError
(这确实会导致连接关闭),请务必从您的 buildProtocol
方法中 return 一个 IProtocol
提供程序。
class DoesNothing(Protocol):
pass
class YourFactory(Factory):
def buildProtocol(self, addr):
if self.currentConnections < self.maxConnections:
return Factory.buildProtocol(self, addr)
protocol = DoesNothing()
protocol.factory = self
return protocol
如果您使用这个工厂(填写缺失的部分 - 例如,初始化 maxConnections
并因此正确跟踪 currentConnections
)然后您会发现一旦达到限制就会连接的客户端被赋予 DoesNothing
协议。他们可以向该协议发送任意数量的数据。它会把它全部丢弃。它永远不会向他们发送任何数据。它将保持连接打开,直到他们关闭它。简而言之,它什么都不做。
但是,您还希望客户端在连接数低于限制时实际接收服务。
为此,您还需要几块:
- 您必须对他们可能发送的任何数据进行缓冲,以便在您准备好阅读时可以阅读。
- 您必须跟踪连接,以便在时机成熟时开始为它们提供服务。
- 您必须在指定时间开始为他们提供服务。
对于其中的第一个,您可以使用大多数传输的功能 "pause":
class PauseTransport(Protocol):
def makeConnection(self, transport):
transport.pauseProducing()
class YourFactory(Factory):
def buildProtocol(self, addr):
if self.currentConnections < self.maxConnections:
return Factory.buildProtocol(self, addr)
protocol = PauseTransport()
protocol.factory = self
return protocol
PauseTransport
与 DoesNothing
类似,但有一个小的(也是有用的)区别,即一旦连接到传输,它就会告诉传输暂停。因此,永远不会从连接中读取任何数据,并且无论何时您准备好处理它,它都将保持缓冲状态。
对于下一个需求,存在许多可能的解决方案。最简单的一种是使用工厂作为存储:
class PauseAndStoreTransport(Protocol):
def makeConnection(self, transport):
transport.pauseProducing()
self.factory.addPausedTransport(transport)
class YourFactory(Factory):
def buildProtocol(self, addr):
# As above
...
def addPausedTransport(self, transport):
self.transports.append(transport)
同样,通过正确的设置(例如,初始化 transports
属性),您现在有一个所有传输的列表,这些传输对应于您已经接受的超过并发限制的连接正在等待服务。
对于最后一个要求,所需要做的就是实例化和初始化实际能够为您的客户提供服务的协议。实例化很容易(这是你的协议,你可能知道它是如何工作的)。初始化主要是调用 makeConnection
方法:
class YourFactory(Factory):
def buildProtocol(self, addr):
# As above
...
def addPausedTransport(self, transport):
# As above
...
def oneConnectionDisconnected(self)
self.currentConnections -= 1
if self.currentConnections < self.maxConnections:
transport = self.transports.pop(0)
protocol = self.buildProtocol(address)
protocol.makeConnection(transport)
transport.resumeProducing()
我省略了跟踪 buildProtocol
所需的 address
参数的细节(transport
从它的原点到程序的这一部分,如果您的程序确实需要,应该清楚如何对原始地址值执行类似的操作。
除此之外,这里发生的所有事情就是您使用下一个排队的传输(如果需要,您可以使用不同的调度算法,例如 LIFO)并将其连接到您选择的协议,就像 Twisted 所做的那样.最后,撤消之前的暂停操作,以便数据开始流动。
或者……差不多。这将非常巧妙,除非 Twisted 传输实际上没有公开任何方式来更改它们 将数据传送 到的协议。因此,正如所写的那样,来自客户端的数据实际上将被传送到原始的 PauseAndStoreTransport
协议实例。您可以解决这个问题("hack" 显然是正确的词)。将传输 和 PauseAndStoreTransport
实例都存储在工厂的列表中,然后:
def oneConnectionDisconnected(self)
self.currentConnections -= 1
if self.currentConnections < self.maxConnections:
originalProtocol, transport = self.transports.pop(0)
newProtocol = self.buildProtocol(address)
originalProtocol.dataReceived = newProtocol.dataReceived
originalProtocol.connectionLost = newProtocol.connectionLost
newProtocol.makeConnection(transport)
transport.resumeProducing()
现在传输想要调用方法的对象的方法已经被你想要调用方法的对象的方法替换了。同样,这显然是黑客攻击。如果你愿意,你可以把一些不那么骇人听闻的东西放在一起(例如,第三个协议 class 明确支持委托给另一个协议)。这个想法是一样的 - 它只会让你的键盘更加磨损。对于它的价值,我怀疑使用 Tubes 做类似的事情可能既容易又少打字,但我现在将尝试基于该库的解决方案留给其他人。
我已经避免解决保持 currentConnections
正确更新的问题。由于您的问题中已经包含 numConnections
,我假设您知道如何管理该部分。我在最后一步所做的就是假设你执行递减步骤的方式是在工厂上调用 oneConnectionDisconnected
。
我还避免解决排队连接变得无聊并消失的事件。这将主要按书面方式工作 - Twisted 不会注意到连接已关闭,直到您调用 resumeProducing
然后 connectionLost
将在您的应用程序协议上调用。这应该没问题,因为您的协议无论如何都需要处理丢失的连接。
所以我构建了一个扭曲的服务器,我想知道限制同时连接数的最佳方法是什么?
我的工厂 return None 是最好的方法吗?当我这样做时,我会抛出很多异常,例如:
exceptions.AttributeError: 'NoneType' object has no attribute 'makeConnection'
我希望以某种方式让客户端排队等待,直到当前连接数下降,但我不知道如何异步执行此操作。
目前我的工厂是这样使用的:
class HandleClientFactory(Factory):
def __init__(self):
self.numConnections = 0
def buildProtocol(self, addr):
#limit connection number here
if self.numConnections >= Max_Clients:
logging.warning("Reached maximum Client connections")
return None
return HandleClient(self)
这有效,但会断开连接而不是等待,并且还会抛出很多未处理的错误。
你必须自己构建它。幸运的是,这些作品大部分都已到位(您可能会要求稍微更合适的作品,但是......)
首先,要避免 AttributeError
(这确实会导致连接关闭),请务必从您的 buildProtocol
方法中 return 一个 IProtocol
提供程序。
class DoesNothing(Protocol):
pass
class YourFactory(Factory):
def buildProtocol(self, addr):
if self.currentConnections < self.maxConnections:
return Factory.buildProtocol(self, addr)
protocol = DoesNothing()
protocol.factory = self
return protocol
如果您使用这个工厂(填写缺失的部分 - 例如,初始化 maxConnections
并因此正确跟踪 currentConnections
)然后您会发现一旦达到限制就会连接的客户端被赋予 DoesNothing
协议。他们可以向该协议发送任意数量的数据。它会把它全部丢弃。它永远不会向他们发送任何数据。它将保持连接打开,直到他们关闭它。简而言之,它什么都不做。
但是,您还希望客户端在连接数低于限制时实际接收服务。
为此,您还需要几块:
- 您必须对他们可能发送的任何数据进行缓冲,以便在您准备好阅读时可以阅读。
- 您必须跟踪连接,以便在时机成熟时开始为它们提供服务。
- 您必须在指定时间开始为他们提供服务。
对于其中的第一个,您可以使用大多数传输的功能 "pause":
class PauseTransport(Protocol):
def makeConnection(self, transport):
transport.pauseProducing()
class YourFactory(Factory):
def buildProtocol(self, addr):
if self.currentConnections < self.maxConnections:
return Factory.buildProtocol(self, addr)
protocol = PauseTransport()
protocol.factory = self
return protocol
PauseTransport
与 DoesNothing
类似,但有一个小的(也是有用的)区别,即一旦连接到传输,它就会告诉传输暂停。因此,永远不会从连接中读取任何数据,并且无论何时您准备好处理它,它都将保持缓冲状态。
对于下一个需求,存在许多可能的解决方案。最简单的一种是使用工厂作为存储:
class PauseAndStoreTransport(Protocol):
def makeConnection(self, transport):
transport.pauseProducing()
self.factory.addPausedTransport(transport)
class YourFactory(Factory):
def buildProtocol(self, addr):
# As above
...
def addPausedTransport(self, transport):
self.transports.append(transport)
同样,通过正确的设置(例如,初始化 transports
属性),您现在有一个所有传输的列表,这些传输对应于您已经接受的超过并发限制的连接正在等待服务。
对于最后一个要求,所需要做的就是实例化和初始化实际能够为您的客户提供服务的协议。实例化很容易(这是你的协议,你可能知道它是如何工作的)。初始化主要是调用 makeConnection
方法:
class YourFactory(Factory):
def buildProtocol(self, addr):
# As above
...
def addPausedTransport(self, transport):
# As above
...
def oneConnectionDisconnected(self)
self.currentConnections -= 1
if self.currentConnections < self.maxConnections:
transport = self.transports.pop(0)
protocol = self.buildProtocol(address)
protocol.makeConnection(transport)
transport.resumeProducing()
我省略了跟踪 buildProtocol
所需的 address
参数的细节(transport
从它的原点到程序的这一部分,如果您的程序确实需要,应该清楚如何对原始地址值执行类似的操作。
除此之外,这里发生的所有事情就是您使用下一个排队的传输(如果需要,您可以使用不同的调度算法,例如 LIFO)并将其连接到您选择的协议,就像 Twisted 所做的那样.最后,撤消之前的暂停操作,以便数据开始流动。
或者……差不多。这将非常巧妙,除非 Twisted 传输实际上没有公开任何方式来更改它们 将数据传送 到的协议。因此,正如所写的那样,来自客户端的数据实际上将被传送到原始的 PauseAndStoreTransport
协议实例。您可以解决这个问题("hack" 显然是正确的词)。将传输 和 PauseAndStoreTransport
实例都存储在工厂的列表中,然后:
def oneConnectionDisconnected(self)
self.currentConnections -= 1
if self.currentConnections < self.maxConnections:
originalProtocol, transport = self.transports.pop(0)
newProtocol = self.buildProtocol(address)
originalProtocol.dataReceived = newProtocol.dataReceived
originalProtocol.connectionLost = newProtocol.connectionLost
newProtocol.makeConnection(transport)
transport.resumeProducing()
现在传输想要调用方法的对象的方法已经被你想要调用方法的对象的方法替换了。同样,这显然是黑客攻击。如果你愿意,你可以把一些不那么骇人听闻的东西放在一起(例如,第三个协议 class 明确支持委托给另一个协议)。这个想法是一样的 - 它只会让你的键盘更加磨损。对于它的价值,我怀疑使用 Tubes 做类似的事情可能既容易又少打字,但我现在将尝试基于该库的解决方案留给其他人。
我已经避免解决保持 currentConnections
正确更新的问题。由于您的问题中已经包含 numConnections
,我假设您知道如何管理该部分。我在最后一步所做的就是假设你执行递减步骤的方式是在工厂上调用 oneConnectionDisconnected
。
我还避免解决排队连接变得无聊并消失的事件。这将主要按书面方式工作 - Twisted 不会注意到连接已关闭,直到您调用 resumeProducing
然后 connectionLost
将在您的应用程序协议上调用。这应该没问题,因为您的协议无论如何都需要处理丢失的连接。