在 daemonized twisted 中正确使用 API 和线程

Correct use of API with threading in daemonized twisted

我正在使用 API 使用 threading.Thread 的服务,我想在其中使用 Deferreds。

如果我 运行 它像一个标准的 python 模块,我没有任何问题。类似于:

from twisted.internet import reactor
from outside import ServiceUsingThreadingAndDeferred

service = ServiceUsingThreadingAndDeferred()

reactor.listenTCP(port, protocol_factory)

service.start()

reactor.run()

而如果我 运行 使用 twistd -y 下面的 .tac 服务根本无法工作:

from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred

service = ServiceUsingThreadingAndDeferred()

# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)

service.start()

我认为第二种情况的问题是主反应器线程本身没有生成服务线程,但是我不明白为什么(如果)它发生在第一种情况中......我用过callLater 作为解决方法 - 成功:

from twisted.application import internet, service
from outside import ServiceUsingThreadingAndDeferred
from twisted.internet import reactor

service = ServiceUsingThreadingAndDeferred()

# Application set-up
application = service.Application("appName")
my_server = internet.TCPServer(port, protocol_factory)
my_server.setServiceParent(application)

reactor.callLater(1, service.start)

但我不知道这是否是解决此问题的正确方法。你有什么建议吗?

来自 Github 存储库,这个 class 滥用了 Twisted 的线程 API:

class ServiceUsingThreadingAndDeferred():
    def __init__(self):
        pass

    def start(self):
        print "3rd party API service starting..."
        self.run_as_thread()

    def run_as_thread(self, *args, **kwargs):
        t = threading.Thread(target=self.run_forever, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()

    def run_forever(self):
        while 1:
            print "Doing something remote..."
            time.sleep(1)
            now = time.time()
            if 1 > now % 5 >= 0:
                self.defer_activity()

    def defer_activity(self):
        threads.deferToThread(self._activity)

ServiceUsingThreadingAndDeferred.run_forever 运行s 在非反应器线程中。它调用 defer_activity 而调用 threads.deferToThread。不允许在非 reactor 线程中调用 threads.deferToThread。大约有一个 Twisted API 可以安全地在非反应器线程中调用:reactor.callFromThread(它在反应器线程中安排对其参数 运行 的调用)。

working.tac 做同样的事情,但幸运的是,似乎可以在某些版本的 Twisted 上工作。它依赖于在与 callLater 的实现方式交互的非反应器线程中调用 threads.deferToThread 所导致的未定义行为。不能保证它可以完全工作,也不能保证该行为可以跨 Twisted 版本或平台移植。

如果你想从非反应器线程使用反应器的线程池,你需要这样写:

from twisted.internet.threads import (
    blockingCallFromThread,
    deferToThread,
)

d = blockingCallFromThread(reactor, lambda: deferToThread(self._activity))

但是,您也不能在非反应器线程中使用 d 的任何方法(deferToThread 返回的 Deferred)。

最有可能的是,如果可能的话,您应该重写 ServiceUsingThreadingAndDeferred 的逻辑,使其与反应器兼容,然后您就可以避免所有这些恶作剧。