来自异步生成器的 asyncio as_yielded

asyncio as_yielded from async generators

我希望能够从多个异步协程中产生结果。 Asyncio 的 as_completed 有点接近我正在寻找的东西(即我希望任何协程能够随时返回调用者然后继续),但这似乎只允许常规协程用一个 return.

这是我目前的情况:

import asyncio


async def test(id_):
    print(f'{id_} sleeping')
    await asyncio.sleep(id_)
    return id_


async def test_gen(id_):
    count = 0
    while True:
        print(f'{id_} sleeping')
        await asyncio.sleep(id_)
        yield id_
        count += 1
        if count > 5:
            return


async def main():
    runs = [test(i) for i in range(3)]

    for i in asyncio.as_completed(runs):
        i = await i
        print(f'{i} yielded')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

runs = [test(i) for i in range(3)] 替换为 runs = [test_gen(i) for i in range(3)] 并让 for i in asyncio.as_completed(runs) 迭代每个产量是我所追求的。

这是否可以在 Python 中表达,是否有任何第三方可以为您提供比协程流程标准库更多的选择?

谢谢

您可以使用 aiostream.stream.merge:

from aiostream import stream

async def main():
    runs = [test_gen(i) for i in range(3)]
    async for x in stream.merge(*runs):
        print(f'{x} yielded')

运行 它在 safe context 中以确保生成器在迭代后被正确清理:

async def main():
    runs = [test_gen(i) for i in range(3)]
    merged = stream.merge(*runs)
    async with merged.stream() as streamer:
        async for x in streamer:
            print(f'{x} yielded')

或者使用pipes使其更紧凑:

from aiostream import stream, pipe

async def main():
    runs = [test_gen(i) for i in range(3)]
    await (stream.merge(*runs) | pipe.print('{} yielded'))

documentation 中有更多示例。


处理@nirvana-msu 评论

可以通过相应地准备源来识别产生给定值的生成器:

async def main():
    runs = [test_gen(i) for i in range(3)]
    sources = [stream.map(xs, lambda x: (i, x)) for i, xs in enumerate(runs)]
    async for i, x in stream.merge(*sources):
        print(f'ID {i}: {x}')