在 daemonized twisted 中正确使用 API 和线程
Correct use of API with threading in daemonized twisted
我正在使用 API 使用 threading.Thread 的服务,我想在其中使用 Deferreds。
如果我 运行 它像一个标准的 python 模块,我没有任何问题。类似于:
from twisted.internet import reactor
from outside import ServiceUsingThreadingAndDeferred
service = ServiceUsingThreadingAndDeferred()
reactor.listenTCP(port, protocol_factory)
service.start()
reactor.run()
而如果我 运行 使用 twistd -y 下面的 .tac 服务根本无法工作:
from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred
service = ServiceUsingThreadingAndDeferred()
# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)
service.start()
我认为第二种情况的问题是主反应器线程本身没有生成服务线程,但是我不明白为什么(如果)它发生在第一种情况中......我用过callLater 作为解决方法 - 成功:
from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred
from twisted.internet import reactor
service = ServiceUsingThreadingAndDeferred()
# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)
reactor.callLater(1, service.start)
但我不知道这是否是解决此问题的正确方法。你有什么建议吗?
来自 Github 存储库,这个 class 滥用了 Twisted 的线程 API:
class ServiceUsingThreadingAndDeferred():
def __init__(self):
pass
def start(self):
print "3rd party API service starting..."
self.run_as_thread()
def run_as_thread(self, *args, **kwargs):
t = threading.Thread(target=self.run_forever, args=args, kwargs=kwargs)
t.daemon = True
t.start()
def run_forever(self):
while 1:
print "Doing something remote..."
time.sleep(1)
now = time.time()
if 1 > now % 5 >= 0:
self.defer_activity()
def defer_activity(self):
threads.deferToThread(self._activity)
ServiceUsingThreadingAndDeferred.run_forever
运行s 在非反应器线程中。它调用 defer_activity
而调用 threads.deferToThread
。不允许在非 reactor 线程中调用 threads.deferToThread
。大约有一个 Twisted API 可以安全地在非反应器线程中调用:reactor.callFromThread
(它在反应器线程中安排对其参数 运行 的调用)。
working.tac
做同样的事情,但幸运的是,似乎可以在某些版本的 Twisted 上工作。它依赖于在与 callLater
的实现方式交互的非反应器线程中调用 threads.deferToThread
所导致的未定义行为。不能保证它可以完全工作,也不能保证该行为可以跨 Twisted 版本或平台移植。
如果你想从非反应器线程使用反应器的线程池,你需要这样写:
from twisted.internet.threads import (
blockingCallFromThread,
deferToThread,
)
d = blockingCallFromThread(reactor, lambda: deferToThread(self._activity))
但是,您也不能在非反应器线程中使用 d
的任何方法(deferToThread
返回的 Deferred
)。
最有可能的是,如果可能的话,您应该重写 ServiceUsingThreadingAndDeferred
的逻辑,使其与反应器兼容,然后您就可以避免所有这些恶作剧。
我正在使用 API 使用 threading.Thread 的服务,我想在其中使用 Deferreds。
如果我 运行 它像一个标准的 python 模块,我没有任何问题。类似于:
from twisted.internet import reactor
from outside import ServiceUsingThreadingAndDeferred
service = ServiceUsingThreadingAndDeferred()
reactor.listenTCP(port, protocol_factory)
service.start()
reactor.run()
而如果我 运行 使用 twistd -y 下面的 .tac 服务根本无法工作:
from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred
service = ServiceUsingThreadingAndDeferred()
# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)
service.start()
我认为第二种情况的问题是主反应器线程本身没有生成服务线程,但是我不明白为什么(如果)它发生在第一种情况中......我用过callLater 作为解决方法 - 成功:
from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred
from twisted.internet import reactor
service = ServiceUsingThreadingAndDeferred()
# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)
reactor.callLater(1, service.start)
但我不知道这是否是解决此问题的正确方法。你有什么建议吗?
来自 Github 存储库,这个 class 滥用了 Twisted 的线程 API:
class ServiceUsingThreadingAndDeferred():
def __init__(self):
pass
def start(self):
print "3rd party API service starting..."
self.run_as_thread()
def run_as_thread(self, *args, **kwargs):
t = threading.Thread(target=self.run_forever, args=args, kwargs=kwargs)
t.daemon = True
t.start()
def run_forever(self):
while 1:
print "Doing something remote..."
time.sleep(1)
now = time.time()
if 1 > now % 5 >= 0:
self.defer_activity()
def defer_activity(self):
threads.deferToThread(self._activity)
ServiceUsingThreadingAndDeferred.run_forever
运行s 在非反应器线程中。它调用 defer_activity
而调用 threads.deferToThread
。不允许在非 reactor 线程中调用 threads.deferToThread
。大约有一个 Twisted API 可以安全地在非反应器线程中调用:reactor.callFromThread
(它在反应器线程中安排对其参数 运行 的调用)。
working.tac
做同样的事情,但幸运的是,似乎可以在某些版本的 Twisted 上工作。它依赖于在与 callLater
的实现方式交互的非反应器线程中调用 threads.deferToThread
所导致的未定义行为。不能保证它可以完全工作,也不能保证该行为可以跨 Twisted 版本或平台移植。
如果你想从非反应器线程使用反应器的线程池,你需要这样写:
from twisted.internet.threads import (
blockingCallFromThread,
deferToThread,
)
d = blockingCallFromThread(reactor, lambda: deferToThread(self._activity))
但是,您也不能在非反应器线程中使用 d
的任何方法(deferToThread
返回的 Deferred
)。
最有可能的是,如果可能的话,您应该重写 ServiceUsingThreadingAndDeferred
的逻辑,使其与反应器兼容,然后您就可以避免所有这些恶作剧。