停止反应堆会杀死不同端口上的所有反应堆 运行
Stopping a Reactor kills All Reactors Running on Different Ports
我正在尝试实现此代码,其中每个 "node" 都是独立的 "actor"。
from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor, defer
from twisted.protocols.basic import LineReceiver
class ChatProtocol(LineReceiver):
def __init__(self, factory):
self.factory = factory
self.name = None
self.state = "REGISTER"
def connectionMade(self):
self.sendLine("What's your name?")
def connectionLost(self, reason):
if self.name in self.factory.users:
del self.factory.users[self.name]
self.broadcastMessage("{} has left the channel.".format(self.name))
def lineReceived(self, line):
if self.state == "REGISTER":
self.handle_REGISTER(line)
else:
self.handle_CHAT(line)
def handle_REGISTER(self, name):
if name in self.factory.users:
self.sendLine("Name taken, please choose another!")
return
self.sendLine("Welcome, {}".format(name))
self.broadcastMessage("{} has joined the channel.".format(name))
self.name = name
self.factory.users[name] = self
self.state = "CHAT"
def handle_CHAT(self, message):
message = "[%s]>> %s" % (self.name, message)
self.broadcastMessage(message)
def broadcastMessage(self, message):
for name, protocol in self.factory.users.iteritems():
if protocol != self:
protocol.sendLine(message)
class ChatFactory(Factory):
"""Handle all the nodes' connection"""
def __init__(self):
self.users = {}
def buildProtocol(self, addr):
return ChatProtocol(self)
class Node:
def __init__(self, stop=None):
self.Factory = ChatFactory
self.reactor = reactor
self.d = defer.Deferred()
# with `stop` the node is bound to die
if stop:
self.reactor.callLater(stop, self.stop)
def listen(self, port):
self.reactor.listenTCP(port, self.Factory())
def run(self):
self.reactor.run()
def stop(self):
self.reactor.stop()
class Organization:
"""
An organization consists of several nodes, with one node as a leader
"""
def __init__(self):
self.nodes = []
def create_leader(self):
# create first node now with intentionally kill the leader's reactor after 5 seconds
leader_node = Node(5)
leader_node.listen(8000)
self.nodes.append(leader_node)
def create_more_nodes(self):
node_1 = Node()
node_2 = Node()
self.nodes.append(node_1)
self.nodes.append(node_2)
def activate(self):
self.nodes[1].listen(8001)
self.nodes[2].listen(8002)
"""
now leader_node listens at 8000
node_1 listens at 8001
node_2 listens at 8002
"""
# run each node's reactor
for n in self.nodes:
n.run()
if __name__ == '__main__':
org = Organization()
org.create_leader()
org.create_more_nodes()
org.activate()
5 秒后,leader_node
的反应堆通过 Node.stop()
被延迟停止。但是,我不知道为什么 node_1
和 node_2
监听 8001 和 8002 也停止了。如果有更多 Twisted 经验的人可以指出这一点,那就太好了!
reactor.run()
表示 "run the entire program"。虽然它不会强制终止(该函数执行 return),但这样做只是为了让您在退出前清理一些状态。所以你应该只 运行 每个进程一个反应器,并在它完成后不久退出。
如果您想拥有可以关闭所有传入和传出连接以及侦听端口的独立服务,则必须在 connectionMade
和 connectionLost
中跟踪这些连接.您还必须跟踪您的侦听端口,以便您可以 stopListening
.
附带说明一下,listenTCP
是一个非常低级的 API,您可能不应该直接调用它;相反,使用更灵活的高级 Endpoints API。
这是您的代码版本,可跟踪入站和连接以及侦听端口并根据需要关闭它们,同时在所有节点之间共享反应器。
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.protocols.basic import LineReceiver
from twisted.internet.defer import Deferred
class ChatProtocol(LineReceiver):
def __init__(self, factory):
self.factory = factory
self.name = None
self.state = "REGISTER"
def connectionMade(self):
self.factory.node.activeTransports.append(self.transport)
self.sendLine("What's your name?")
def connectionLost(self, reason):
self.factory.node.activeTransports.remove(self.transport)
if self.name in self.factory.users:
del self.factory.users[self.name]
self.broadcastMessage("{} has left the channel.".format(self.name))
def lineReceived(self, line):
if self.state == "REGISTER":
self.handle_REGISTER(line)
else:
self.handle_CHAT(line)
def handle_REGISTER(self, name):
if name in self.factory.users:
self.sendLine("Name taken, please choose another!")
return
self.sendLine("Welcome, {}".format(name))
self.broadcastMessage("{} has joined the channel.".format(name))
self.name = name
self.factory.users[name] = self
self.state = "CHAT"
def handle_CHAT(self, message):
message = "[%s]>> %s" % (self.name, message)
self.broadcastMessage(message)
def broadcastMessage(self, message):
for name, protocol in self.factory.users.iteritems():
if protocol != self:
protocol.sendLine(message)
class ChatFactory(Factory):
"""Handle all the nodes' connection"""
def __init__(self, node):
self.users = {}
self.node = node
def buildProtocol(self, addr):
return ChatProtocol(self)
class Node:
def __init__(self, endpoint, clock, stop=None):
self.Factory = ChatFactory
self._endpoint = endpoint
self._listenStarting = None
self._listeningPort = None
self.activeTransports = []
if stop is not None:
print("Scheduling stop.", stop)
clock.callLater(stop, self.stop)
def listen(self):
self._listenStarting = self._endpoint.listen(self.Factory(self))
def setPort(port):
self._listeningPort = port
def clear(whatever):
self._listenStarting = None
return whatever
self._listenStarting.addCallback(setPort).addBoth(clear)
def stop(self):
if self._listenStarting is not None:
self._listenStarting.cancel()
if self._listeningPort is not None:
self._listeningPort.stopListening()
for transport in self.activeTransports[:]:
transport.abortConnection()
class Organization:
def __init__(self, reactor):
self.reactor = reactor
self.nodes = []
def port(self, number):
return TCP4ServerEndpoint(self.reactor, number)
def create_leader(self):
leader_node = Node(self.port(8000), self.reactor, 5)
leader_node.listen()
self.nodes.append(leader_node)
def create_more_nodes(self):
node_1 = Node(self.port(8001), self.reactor)
node_2 = Node(self.port(8002), self.reactor)
self.nodes.append(node_1)
self.nodes.append(node_2)
def activate(self):
self.nodes[1].listen()
self.nodes[2].listen()
def main(reactor):
org = Organization(reactor)
org.create_leader()
org.create_more_nodes()
org.activate()
return Deferred()
if __name__ == '__main__':
from twisted.internet.task import react
react(main)
我正在尝试实现此代码,其中每个 "node" 都是独立的 "actor"。
from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor, defer
from twisted.protocols.basic import LineReceiver
class ChatProtocol(LineReceiver):
def __init__(self, factory):
self.factory = factory
self.name = None
self.state = "REGISTER"
def connectionMade(self):
self.sendLine("What's your name?")
def connectionLost(self, reason):
if self.name in self.factory.users:
del self.factory.users[self.name]
self.broadcastMessage("{} has left the channel.".format(self.name))
def lineReceived(self, line):
if self.state == "REGISTER":
self.handle_REGISTER(line)
else:
self.handle_CHAT(line)
def handle_REGISTER(self, name):
if name in self.factory.users:
self.sendLine("Name taken, please choose another!")
return
self.sendLine("Welcome, {}".format(name))
self.broadcastMessage("{} has joined the channel.".format(name))
self.name = name
self.factory.users[name] = self
self.state = "CHAT"
def handle_CHAT(self, message):
message = "[%s]>> %s" % (self.name, message)
self.broadcastMessage(message)
def broadcastMessage(self, message):
for name, protocol in self.factory.users.iteritems():
if protocol != self:
protocol.sendLine(message)
class ChatFactory(Factory):
"""Handle all the nodes' connection"""
def __init__(self):
self.users = {}
def buildProtocol(self, addr):
return ChatProtocol(self)
class Node:
def __init__(self, stop=None):
self.Factory = ChatFactory
self.reactor = reactor
self.d = defer.Deferred()
# with `stop` the node is bound to die
if stop:
self.reactor.callLater(stop, self.stop)
def listen(self, port):
self.reactor.listenTCP(port, self.Factory())
def run(self):
self.reactor.run()
def stop(self):
self.reactor.stop()
class Organization:
"""
An organization consists of several nodes, with one node as a leader
"""
def __init__(self):
self.nodes = []
def create_leader(self):
# create first node now with intentionally kill the leader's reactor after 5 seconds
leader_node = Node(5)
leader_node.listen(8000)
self.nodes.append(leader_node)
def create_more_nodes(self):
node_1 = Node()
node_2 = Node()
self.nodes.append(node_1)
self.nodes.append(node_2)
def activate(self):
self.nodes[1].listen(8001)
self.nodes[2].listen(8002)
"""
now leader_node listens at 8000
node_1 listens at 8001
node_2 listens at 8002
"""
# run each node's reactor
for n in self.nodes:
n.run()
if __name__ == '__main__':
org = Organization()
org.create_leader()
org.create_more_nodes()
org.activate()
5 秒后,leader_node
的反应堆通过 Node.stop()
被延迟停止。但是,我不知道为什么 node_1
和 node_2
监听 8001 和 8002 也停止了。如果有更多 Twisted 经验的人可以指出这一点,那就太好了!
reactor.run()
表示 "run the entire program"。虽然它不会强制终止(该函数执行 return),但这样做只是为了让您在退出前清理一些状态。所以你应该只 运行 每个进程一个反应器,并在它完成后不久退出。
如果您想拥有可以关闭所有传入和传出连接以及侦听端口的独立服务,则必须在 connectionMade
和 connectionLost
中跟踪这些连接.您还必须跟踪您的侦听端口,以便您可以 stopListening
.
附带说明一下,listenTCP
是一个非常低级的 API,您可能不应该直接调用它;相反,使用更灵活的高级 Endpoints API。
这是您的代码版本,可跟踪入站和连接以及侦听端口并根据需要关闭它们,同时在所有节点之间共享反应器。
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.protocols.basic import LineReceiver
from twisted.internet.defer import Deferred
class ChatProtocol(LineReceiver):
def __init__(self, factory):
self.factory = factory
self.name = None
self.state = "REGISTER"
def connectionMade(self):
self.factory.node.activeTransports.append(self.transport)
self.sendLine("What's your name?")
def connectionLost(self, reason):
self.factory.node.activeTransports.remove(self.transport)
if self.name in self.factory.users:
del self.factory.users[self.name]
self.broadcastMessage("{} has left the channel.".format(self.name))
def lineReceived(self, line):
if self.state == "REGISTER":
self.handle_REGISTER(line)
else:
self.handle_CHAT(line)
def handle_REGISTER(self, name):
if name in self.factory.users:
self.sendLine("Name taken, please choose another!")
return
self.sendLine("Welcome, {}".format(name))
self.broadcastMessage("{} has joined the channel.".format(name))
self.name = name
self.factory.users[name] = self
self.state = "CHAT"
def handle_CHAT(self, message):
message = "[%s]>> %s" % (self.name, message)
self.broadcastMessage(message)
def broadcastMessage(self, message):
for name, protocol in self.factory.users.iteritems():
if protocol != self:
protocol.sendLine(message)
class ChatFactory(Factory):
"""Handle all the nodes' connection"""
def __init__(self, node):
self.users = {}
self.node = node
def buildProtocol(self, addr):
return ChatProtocol(self)
class Node:
def __init__(self, endpoint, clock, stop=None):
self.Factory = ChatFactory
self._endpoint = endpoint
self._listenStarting = None
self._listeningPort = None
self.activeTransports = []
if stop is not None:
print("Scheduling stop.", stop)
clock.callLater(stop, self.stop)
def listen(self):
self._listenStarting = self._endpoint.listen(self.Factory(self))
def setPort(port):
self._listeningPort = port
def clear(whatever):
self._listenStarting = None
return whatever
self._listenStarting.addCallback(setPort).addBoth(clear)
def stop(self):
if self._listenStarting is not None:
self._listenStarting.cancel()
if self._listeningPort is not None:
self._listeningPort.stopListening()
for transport in self.activeTransports[:]:
transport.abortConnection()
class Organization:
def __init__(self, reactor):
self.reactor = reactor
self.nodes = []
def port(self, number):
return TCP4ServerEndpoint(self.reactor, number)
def create_leader(self):
leader_node = Node(self.port(8000), self.reactor, 5)
leader_node.listen()
self.nodes.append(leader_node)
def create_more_nodes(self):
node_1 = Node(self.port(8001), self.reactor)
node_2 = Node(self.port(8002), self.reactor)
self.nodes.append(node_1)
self.nodes.append(node_2)
def activate(self):
self.nodes[1].listen()
self.nodes[2].listen()
def main(reactor):
org = Organization(reactor)
org.create_leader()
org.create_more_nodes()
org.activate()
return Deferred()
if __name__ == '__main__':
from twisted.internet.task import react
react(main)