如何处理协程函数的多个结果?

How to handle multiple results from a coroutine function?

我有一些生成器在做一些搜索,我用另一个生成器将它们包装起来:

def searching_stuff_1():
    # searching
    yield 1
    # and searching
    yield 2
    yield 3

def searching_stuff_2():
    yield 4
    yield 5


def gen():
    yield from searching_stuff_1()
    yield from searching_stuff_2()

for result in gen():
    print(result)

所以现在我想知道如何将它重写为异步版本,它可以在 searching_stuff_1 和 searching_stuff_2 中产生多个值。

我正在尝试:

import asyncio

async def searching_stuff_1():
    f = asyncio.Future()
    result = []
    await asyncio.sleep(1)
    #searching
    result.append(1)
    #searching
    result.append(2)
    result.append(3)
    f.set_result(result)
    return f

async def searching_stuff_2():
    f = asyncio.Future()
    result = []
    await asyncio.sleep(1)
    result.append(4)
    result.append(5)
    f.set_result(result)
    return f

async def producer():
    coros = [searching_stuff_1(), searching_stuff_2()]
    for future in asyncio.as_completed(coros):
        yield await future

async def consumer(xs):
    async for future in xs:
        for r in future.result():
            print(r)
loop = asyncio.get_event_loop()
loop.run_until_complete(consumer(producer()))
loop.close()

但是,在这个版本中,我必须将所有结果附加到一个列表中并将其包装在一个 Future 实例中。我想知道是否有更好的方法来处理协程函数的多个结果。我还能得到这些数字吗?

是的,您仍然可以在异步版本中生成这些数字,这是 Asynchronous Generators, You can use async for(PEP492) and Asynchronous Comprehensions(PEP530),像这样,从您的第一个示例重写。虽然这需要 python 版本高于或等于 3.6

import asyncio


async def searching_stuff_1():
    # searching
    yield 1
    # and searching
    yield 2
    yield 3


async def searching_stuff_2():
    yield 4
    yield 5


async def gen():
    async for i in searching_stuff_1():
        yield i
    async for i in searching_stuff_2():
        yield i


async def gen_all():
    return [i async for i in gen()]

if __name__ == "__main__":
    result = asyncio.get_event_loop().run_until_complete(gen_all())
    print(result)

对于希望运行两个异步生成器异步,可以使用asyncio.gather.
但是由于 asyncio.gather 仅以异步方式收集协程的结果,因此在调用 asyncio.gather、

之前,您需要将异步生成器产生的每个结果分别与 async def gen2 组合
async def gen2(coro):
    return [i async for i in coro()]


# combine two async
async def gen_all2():
    return list(chain.from_iterable(await gather(gen2(searching_stuff_1), gen2(searching_stuff_2))))

为了证明我的观点,我们可以把searching_stuff改成:

async def searching_stuff_1():
    print("searching_stuff_1 begin")
    # searching
    yield 1
    await asyncio.sleep(1)
    # and searching
    yield 2
    yield 3
    print("searching_stuff_1 end")

async def searching_stuff_2():
    print("searching_stuff_2 begin")
    yield 4
    await asyncio.sleep(1)
    yield 5
    print("searching_stuff_2 end")

然后移动:

result = asyncio.get_event_loop().run_until_complete(gen_all())
print(result)

> searching_stuff_2 begin
> searching_stuff_1 begin
> searching_stuff_2 end
> searching_stuff_1 end
> [1, 2, 3, 4, 5]