在 Twisted 中重用延迟对象

Re-using deferred objects in Twisted

在 Twisted 中,延迟对象似乎只能在其回调触发后使用一次,这与我使用过的其他基于 "promise" 的库相反:

from twisted.internet import defer


class Foo(object):

    def __init__(self):
        self.dfd = defer.Deferred()

    def async(self):
        return self.dfd

    @defer.inlineCallbacks
    def func(self):
        print 'Started!'

        result = yield self.async()

        print 'Stopped with result: {0}'.format(result)

if __name__ == '__main__':
    foo = Foo()
    foo.func()

    import time
    time.sleep(3)

    foo.dfd.callback('3 seconds passed!')

    foo.func()

在标准输出中,一个有:

$ Started!
$ Stopped with result: 3 seconds passed!
$ Started!
$ Stopped with result: None

在我的情况下,我希望 funcreactor 线程中被一次又一次地调用。有什么方法可以确保 yield 调用始终 return 延迟对象的 "resolved" 值而不引入额外的状态,如果是这样,最优雅的方法是什么?

更新

根据下面的建议,我实现了一个装饰器解决方案:

import functools


def recycles_deferred(deferred_getter):
    """Given a callable deferred_getter that returns a deferred object, create
    another function that returns a 'reusable' version of that deferred object."""
    @functools.wraps(deferred_getter)
    def _recycler(*args, **kwargs):
        old_dfd = deferred_getter(*args, **kwargs)
        new_dfd = defer.Deferred()

        def _recycle(result):
            new_dfd.callback(result)
            return result

        old_dfd.addCallback(_recycle)

        return new_dfd

    return _recycler



if __name__ == '__main__':
    """Demonstration of how this @recycles_deferred should be used."""
    import time

    from twisted.internet import defer

    class O(object):

        def __init__(self):
            """In practice this could representation a network request."""
            self.dfd = defer.Deferred()

        def do_something_with_result(self, result):
            print 'Got result: {0}'.format(result)
            return result

        @recycles_deferred
        def deferred_getter(self):
            """Return the deferred."""
            return self.dfd

        @defer.inlineCallbacks
        def do_something_with_deferred(self):
            result = yield self.deferred_getter()

            print 'Got inline result: {0}'.format(result)


    o = O()

    o.dfd.addCallback(o.do_something_with_result) # Got result: foo
    o.do_something_with_deferred()                # Got inline result: foo
    o.dfd.addCallback(o.do_something_with_result) # Got result: foo

    # sleep 3 seconds, then resolve the deferred
    time.sleep(3)
    o.dfd.callback('foo')

    o.do_something_with_deferred()                # Got inline result: foo
    o.dfd.addCallback(o.do_something_with_result) # Got result: foo

    # the inline call to yield never returns None
    o.do_something_with_deferred() # Got inline result: foo
    o.do_something_with_deferred() # Got inline result: foo
    o.do_something_with_deferred() # Got inline result: foo

问题不在于 Deferred 本身只能是 "used once" - 它可以无限重复使用,因为您可以永远向它添加回调,数据将继续流向下一个回调,以及下一个可用的回调。您看到的问题是,当您向 Deferred 添加回调时, 其结果会传播到下一个回调 .

这里的另一个交叉问题是 yieldinlineCallbacks 函数中 Deferred 假定为 "consume" Deferred - 你'重新获取它的值并用它做一些事情,以防止不必要的资源利用(Deferred 携带结果的时间比它需要的时间长),回调给你 yield 表达式的结果本身也 returns None。如果它 return 某种更明确的 "consumed by inlineCallbacks token",我想可能会更容易理解,但事后看来是 20/20 :-)。

但是从某种意义上说,一个Deferred可以只"used"一次,也就是说,如果你有一个API which returns a Deferred, 应该 return a new Deferred 每个呼叫者。通过 returning 它,您实际上是将所有权转移给调用者,因为调用者可能会修改结果,以传递给他们自己的调用者。典型的例子是,如果你有一个 API,return 是一个用一些字节触发的 Deferred,但你知道字节应该是 JSON,你可以添加.addCallback(json.loads) 然后 return 它,这将允许调用者使用 JSON-序列化对象而不是字节。

因此,如果您打算多次调用 async,您可以采用如下方式:

from __future__ import print_function, unicode_literals

from twisted.internet import defer

class Foo(object):

    def __init__(self):
        self.dfd = defer.Deferred()

    def async(self):
        justForThisCall = defer.Deferred()
        def callbackForDFD(result):
            justForThisCall.callback(result)
            return result
        self.dfd.addCallback(callbackForDFD)
        return justForThisCall

    @defer.inlineCallbacks
    def func(self):
        print('Started!')
        result = yield self.async()
        print('Stopped with result: {0}'.format(result))

if __name__ == '__main__':
    foo = Foo()
    print("calling func")
    foo.func()
    print("firing dfd")
    foo.dfd.callback('no need to wait!')
    print("calling func again")
    foo.func()
    print("done")

应该产生这个输出:

calling func
Started!
firing dfd
Stopped with result: no need to wait!
calling func again
Started!
Stopped with result: no need to wait!
done