从订户调用远程过程并解决异步承诺

Calling a remote procedure from a subscriber and resolving the asyncio promise

我在让基于 asyncio 的高速公路 RPC 在事件处理程序中工作时遇到问题:

from autobahn.asyncio import wamp
from autobahn.wamp import register, subscribe

class Foo(wamp.ApplicationSession):
    @subscribe('wamp.metaevent.session.on_join')
    def bar(self):
        baz = yield from self.call('baz')

    @register('baz')
    def baz(self):
        return 'baz'

阅读文档后,我的印象是 应该 有效。但是,如果我在 Foo.bar none 中使用 yield,它的代码就会被执行。我试过用 asyncio.coroutine 装饰各种图案,但根本无法达到 运行。

我发现让它工作的唯一方法是解析返回的 future "manually":

@subscribe('wamp.metaevent.session.on_join')
def bar(self):
    def do_something(f):
        print(f.result())

    f = self.call('baz')
    f.add_done_callback(do_something)

我确定我还没有正确理解 asyncio 编程,所以我必须做什么才能编写 baz = self.call('baz') 并立即获得结果(意味着没有额外的明确声明的回调) ?

存在the yield keyword in the body of a def statement makes the defined function into a generator function

当您调用生成器函数时(如 autobahn 那样),函数体不会被执行。相反,会创建一个 生成器对象 。当您在生成器上调用 next 时,控制前进到下一个 yield 语句。生成器已在其他 Stack Overflow 帖子以及文档和网络上的其他地方进行了广泛讨论。

asyncio 的 API 广泛使用 coroutines and yield from. (For most purposes, you can consider 'coroutine' and 'generator' to be synonymous.) The event loop 跟踪一组生成器并调用 nextsendthrow视情况而定。*

看起来 autobahn.subscribe 需要一个常规回调函数,而不是协程,这就是您的代码未被执行的原因。解决此问题的一种方法是使用 asyncio.async.

编写一个 安排 你的协程的回调函数
class Foo(wamp.ApplicationSession):
    @subscribe('wamp.metaevent.session.on_join')
    def schedule_bar(self):
        coro = self.bar()  # create the coroutine object

        # wrap the coroutine in a Task and register it with the event loop.
        # the loop argument is optional and defaults to asyncio.get_event_loop()
        asyncio.async(coro, loop=my_event_loop)  

    @coroutine
    def bar(self):
        baz = yield from self.call('baz')

    @register('baz')
    def baz(self):
        return 'baz'

如果 autobahn 中没有执行此操作的函数,您可以编写自己的可重用装饰器来为协程订阅 WAMP 主题。

from functools import wraps

def subscribe_coro(uri, loop=None):
    def decorator(f):
        @subscribe(uri)
        @wraps(f)
        def wrapper(*args, **kwargs):
            coro = f(*args, **kwargs)
            asyncio.async(coro, loop=loop)
        return wrapper
    return decorator

现在您的 class 将如下所示:

class Foo(wamp.ApplicationSession):
    @subscribe_coro('wamp.metaevent.session.on_join')
    @coroutine
    def bar(self):
        baz = yield from self.call('baz')

    @register('baz')
    def baz(self):
        return 'baz'

* 这是一个简化。事件循环实际上跟踪 Futures, not coroutines. The algorithm to call the appropriate methods on a generator is implemented by Task,它将协程包装成 Future.