使用 asyncio 协同程序进行方法链接

Method chaining with asyncio coroutines

我想实现 method chaining,但不是为了通常的功能 - 对于 asyncio 协程。

import asyncio


class Browser:
    @asyncio.coroutine
    def go(self):
        # some actions
        return self

    @asyncio.coroutine
    def click(self):
        # some actions
        return self

"Intuitive" 调用链的方式不起作用,因为单一方法 returns 协程(生成器),而不是 self:

@asyncio.coroutine
def main():
    br = yield from Browser().go().click()  # this will fail

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

调用链的正确方法是:

br = yield from (yield from Browser().go()).click()

但是当链增长时它看起来很难看并且变得不可读。

有什么办法可以做得更好吗?欢迎任何想法。

它仍然不是特别漂亮,但您可以实现一个 chain 缩放得更好一点的函数:

import asyncio  

@asyncio.coroutine
def chain(obj, *funcs):
    for f, *args in funcs:
        meth = getattr(obj, f)  # Look up the method on the object
        obj = yield from meth(*args) 
    return obj

class Browser:
    @asyncio.coroutine
    def go(self, x, y):
        return self

    @asyncio.coroutine
    def click(self):
        return self


@asyncio.coroutine
def main():
        #br = yield from (yield from Browser().go(3, 4)).click()
        br = yield from chain(Browser(), 
                                ("go", 3, 4),
                                ("click",))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

想法是将 (method_name, arg1, arg2, argX) 格式的元组传递给 chain 函数,而不是实际链接方法调用本身。如果您不需要支持向链中的任何方法传递参数,则可以直接传递方法名称。

我创建了解决方案,可以完成所需的工作。想法是使用 Browser() 的包装器,它使用 __getattr____call__ 来收集动作(比如获取属性或调用)和 return self 来捕捉下一个动作。收集完所有动作后,我们 "catch" yiled from wrapper 使用 __iter__ 并处理所有收集到的动作。

import asyncio


def chain(obj):
    """
    Enables coroutines chain for obj.
    Usage: text = yield from chain(obj).go().click().attr
    Note: Returns not coroutine, but object that can be yield from.
    """
    class Chain:
        _obj = obj
        _queue = []

        # Collect getattr of call to queue:
        def __getattr__(self, name):
            Chain._queue.append({'type': 'getattr', 'name': name})
            return self

        def __call__(self, *args, **kwargs):
            Chain._queue.append({'type': 'call', 'params': [args, kwargs]})
            return self

        # On iter process queue:
        def __iter__(self):
            res = Chain._obj
            while Chain._queue:
                action = Chain._queue.pop(0)
                if action['type'] == 'getattr':
                    res = getattr(res, action['name'])
                elif action['type'] == 'call':
                    args, kwargs = action['params']
                    res = res(*args, **kwargs)
                if asyncio.iscoroutine(res):
                    res = yield from res
            return res
    return Chain()

用法:

class Browser:
    @asyncio.coroutine
    def go(self):
        print('go')
        return self

    @asyncio.coroutine
    def click(self):
        print('click')
        return self

    def text(self):
        print('text')
        return 5


@asyncio.coroutine
def main():
    text = yield from chain(Browser()).go().click().go().text()
    print(text)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

输出:

go
click
go
text
5

请注意,chain() 不是 return 真正的协程,而是可以像 yield from 上的协程一样使用的对象。我们应该包装 chain() 的结果以获得正常的协程,它可以传递给任何需要协程的 asyncio 函数:

@asyncio.coroutine
def chain_to_coro(chain):
    return (yield from chain)


@asyncio.coroutine
def main():
    ch = chain(Browser()).go().click().go().text()
    coro = chain_to_coro(ch)

    results = yield from asyncio.gather(*[coro], return_exceptions=True)
    print(results)

输出:

go
click
go
text
[5]