监控功能完成并与客户交谈
Monitor function completion and talk to client
我想我正在做一些类似编程的异步操作,但我不确定如何解决这个问题。我在新线程上有一个 function X
运行,在主线程上有一个扭曲的 tcp 服务器。我想在 function X
完成工作时向客户端传递一条消息。代码看起来像这样:
val = None
def x():
global val
#do something to val and send message to client that val is ready
class Server(protocol.Protocol):
def connectionMade(self):
self.transport.write(b'REDPY*')
def dataReceived(self, data):
print("Packet: ",data)
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
print("new client")
return Server()
threading.Thread(target=x).start()
endpoints.serverFromString(reactor, "tcp:1235").listen(EchoFactory())
reactor.run()
我研究了扭曲和事件处理中的延迟,我不太确定该使用什么。
您基本上是在这里进行多线程编程。在主线程中使用 Twisted 并不重要。您仍然需要一些线程安全的通信原语。 Twisted 可能有所作为的唯一方法是它是否提供了一些工具来实现您的目标 而无需 使用线程。你的问题和你的例子都没有明确你的目标是什么所以我不能在那个方向提供任何指导(所以,下次,考虑在你的问题中更具体和详细可能会得到更好的答案)。
您可以使用 Queue
作为线程安全(-ish)线程间通信机制。
import threading
from Queue import Queue
def x(reactor, queue):
while True:
item = queue.get()
# ... processing
threading.Thread(target=x).start()
# ...
reactor.run()
您也可以跳过长运行 线程,只使用 Twisted 的线程池进行短期操作。
a_deferred = deferToThread(some_task, ...)
a_deferred.addCallback(deal_with_result_of_task)
将消息从一个任务传递到另一个任务,然后通过网络传递给客户端是很棘手的。当涉及线程时,感觉就像您在暮光之城中工作 :) 撇开糟糕的笑话不谈,这里有一个基于您的示例的简单片段:
from queue import Queue, Empty
import threading
from twisted.internet import endpoints, protocol, reactor, task
def x(shutdown_event, queue):
while not shutdown_event.is_set():
# do something to val and send message to client that val is ready
queue.put(b'data')
print('thread done')
class Server(protocol.Protocol):
def connectionMade(self):
# append client to list in factory
self.factory.clients.append(self)
self.transport.write(b'REDPY*')
def connectionLost(self, reason):
# remove client from list after disconnection
self.factory.clients.remove(self)
def dataReceived(self, data):
print("Packet: ",data)
class EchoFactory(protocol.Factory):
protocol = Server
def __init__(self):
self.clients = []
def pop_from_queue(queue, factory):
try:
data = queue.get(timeout=0.5)
# process data after this point
# loop through the clients and send them stuff
for client in factory.clients:
# NOTE: ensure the data sent is in bytes
client.transport.write(data)
except Empty:
pass
def main():
factory = EchoFactory()
queue = Queue()
event = threading.Event()
# when the reactor stops, set Event so that threads know to stop
reactor.addSystemEventTrigger('before', 'shutdown', event.set)
# periodically check and pop from the queue
periodic_task = task.LoopingCall(pop_from_queue, queue, factory)
periodic_task.start(1)
threading.Thread(target=x, args=(event, queue)).start()
endpoints.serverFromString(reactor, "tcp:1235").listen(factory)
main()
reactor.run()
你必须从中得到的基本东西是:
线程之间有一个共享的 Queue
对象
Factory
对象包含 clients
的列表
删除了 buildFactory
重载并仅在 EchoFactory
中设置了 protocol
属性。在这种情况下,您不需要做任何花哨的事情,因此最好保留它。此外,默认情况下,Twisted 将在协议中设置一个 self.factory
属性。
当客户端连接时它附加到 factory.clients
,客户端在 connectionLost
后被删除
有一个 LoopingCall
任务会 pop_from_queue
定期
我还添加了一个额外的关闭 Event
,它是在反应器关闭时设置的,并且可以正常关闭线程。如果有什么不明白的请评论,我会更详细地解释。
我想我正在做一些类似编程的异步操作,但我不确定如何解决这个问题。我在新线程上有一个 function X
运行,在主线程上有一个扭曲的 tcp 服务器。我想在 function X
完成工作时向客户端传递一条消息。代码看起来像这样:
val = None
def x():
global val
#do something to val and send message to client that val is ready
class Server(protocol.Protocol):
def connectionMade(self):
self.transport.write(b'REDPY*')
def dataReceived(self, data):
print("Packet: ",data)
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
print("new client")
return Server()
threading.Thread(target=x).start()
endpoints.serverFromString(reactor, "tcp:1235").listen(EchoFactory())
reactor.run()
我研究了扭曲和事件处理中的延迟,我不太确定该使用什么。
您基本上是在这里进行多线程编程。在主线程中使用 Twisted 并不重要。您仍然需要一些线程安全的通信原语。 Twisted 可能有所作为的唯一方法是它是否提供了一些工具来实现您的目标 而无需 使用线程。你的问题和你的例子都没有明确你的目标是什么所以我不能在那个方向提供任何指导(所以,下次,考虑在你的问题中更具体和详细可能会得到更好的答案)。
您可以使用 Queue
作为线程安全(-ish)线程间通信机制。
import threading
from Queue import Queue
def x(reactor, queue):
while True:
item = queue.get()
# ... processing
threading.Thread(target=x).start()
# ...
reactor.run()
您也可以跳过长运行 线程,只使用 Twisted 的线程池进行短期操作。
a_deferred = deferToThread(some_task, ...)
a_deferred.addCallback(deal_with_result_of_task)
将消息从一个任务传递到另一个任务,然后通过网络传递给客户端是很棘手的。当涉及线程时,感觉就像您在暮光之城中工作 :) 撇开糟糕的笑话不谈,这里有一个基于您的示例的简单片段:
from queue import Queue, Empty
import threading
from twisted.internet import endpoints, protocol, reactor, task
def x(shutdown_event, queue):
while not shutdown_event.is_set():
# do something to val and send message to client that val is ready
queue.put(b'data')
print('thread done')
class Server(protocol.Protocol):
def connectionMade(self):
# append client to list in factory
self.factory.clients.append(self)
self.transport.write(b'REDPY*')
def connectionLost(self, reason):
# remove client from list after disconnection
self.factory.clients.remove(self)
def dataReceived(self, data):
print("Packet: ",data)
class EchoFactory(protocol.Factory):
protocol = Server
def __init__(self):
self.clients = []
def pop_from_queue(queue, factory):
try:
data = queue.get(timeout=0.5)
# process data after this point
# loop through the clients and send them stuff
for client in factory.clients:
# NOTE: ensure the data sent is in bytes
client.transport.write(data)
except Empty:
pass
def main():
factory = EchoFactory()
queue = Queue()
event = threading.Event()
# when the reactor stops, set Event so that threads know to stop
reactor.addSystemEventTrigger('before', 'shutdown', event.set)
# periodically check and pop from the queue
periodic_task = task.LoopingCall(pop_from_queue, queue, factory)
periodic_task.start(1)
threading.Thread(target=x, args=(event, queue)).start()
endpoints.serverFromString(reactor, "tcp:1235").listen(factory)
main()
reactor.run()
你必须从中得到的基本东西是:
线程之间有一个共享的
Queue
对象Factory
对象包含clients
的列表
删除了
buildFactory
重载并仅在EchoFactory
中设置了protocol
属性。在这种情况下,您不需要做任何花哨的事情,因此最好保留它。此外,默认情况下,Twisted 将在协议中设置一个self.factory
属性。当客户端连接时它附加到
factory.clients
,客户端在connectionLost
后被删除
有一个
LoopingCall
任务会pop_from_queue
定期
我还添加了一个额外的关闭 Event
,它是在反应器关闭时设置的,并且可以正常关闭线程。如果有什么不明白的请评论,我会更详细地解释。