来自异步生成器的 asyncio as_yielded
asyncio as_yielded from async generators
我希望能够从多个异步协程中产生结果。 Asyncio 的 as_completed
有点接近我正在寻找的东西(即我希望任何协程能够随时返回调用者然后继续),但这似乎只允许常规协程用一个 return.
这是我目前的情况:
import asyncio
async def test(id_):
print(f'{id_} sleeping')
await asyncio.sleep(id_)
return id_
async def test_gen(id_):
count = 0
while True:
print(f'{id_} sleeping')
await asyncio.sleep(id_)
yield id_
count += 1
if count > 5:
return
async def main():
runs = [test(i) for i in range(3)]
for i in asyncio.as_completed(runs):
i = await i
print(f'{i} yielded')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
将 runs = [test(i) for i in range(3)]
替换为 runs = [test_gen(i) for i in range(3)]
并让 for i in asyncio.as_completed(runs)
迭代每个产量是我所追求的。
这是否可以在 Python 中表达,是否有任何第三方可以为您提供比协程流程标准库更多的选择?
谢谢
您可以使用 aiostream.stream.merge:
from aiostream import stream
async def main():
runs = [test_gen(i) for i in range(3)]
async for x in stream.merge(*runs):
print(f'{x} yielded')
运行 它在 safe context 中以确保生成器在迭代后被正确清理:
async def main():
runs = [test_gen(i) for i in range(3)]
merged = stream.merge(*runs)
async with merged.stream() as streamer:
async for x in streamer:
print(f'{x} yielded')
或者使用pipes使其更紧凑:
from aiostream import stream, pipe
async def main():
runs = [test_gen(i) for i in range(3)]
await (stream.merge(*runs) | pipe.print('{} yielded'))
documentation 中有更多示例。
处理@nirvana-msu 评论
可以通过相应地准备源来识别产生给定值的生成器:
async def main():
runs = [test_gen(i) for i in range(3)]
sources = [stream.map(xs, lambda x: (i, x)) for i, xs in enumerate(runs)]
async for i, x in stream.merge(*sources):
print(f'ID {i}: {x}')
我希望能够从多个异步协程中产生结果。 Asyncio 的 as_completed
有点接近我正在寻找的东西(即我希望任何协程能够随时返回调用者然后继续),但这似乎只允许常规协程用一个 return.
这是我目前的情况:
import asyncio
async def test(id_):
print(f'{id_} sleeping')
await asyncio.sleep(id_)
return id_
async def test_gen(id_):
count = 0
while True:
print(f'{id_} sleeping')
await asyncio.sleep(id_)
yield id_
count += 1
if count > 5:
return
async def main():
runs = [test(i) for i in range(3)]
for i in asyncio.as_completed(runs):
i = await i
print(f'{i} yielded')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
将 runs = [test(i) for i in range(3)]
替换为 runs = [test_gen(i) for i in range(3)]
并让 for i in asyncio.as_completed(runs)
迭代每个产量是我所追求的。
这是否可以在 Python 中表达,是否有任何第三方可以为您提供比协程流程标准库更多的选择?
谢谢
您可以使用 aiostream.stream.merge:
from aiostream import stream
async def main():
runs = [test_gen(i) for i in range(3)]
async for x in stream.merge(*runs):
print(f'{x} yielded')
运行 它在 safe context 中以确保生成器在迭代后被正确清理:
async def main():
runs = [test_gen(i) for i in range(3)]
merged = stream.merge(*runs)
async with merged.stream() as streamer:
async for x in streamer:
print(f'{x} yielded')
或者使用pipes使其更紧凑:
from aiostream import stream, pipe
async def main():
runs = [test_gen(i) for i in range(3)]
await (stream.merge(*runs) | pipe.print('{} yielded'))
documentation 中有更多示例。
处理@nirvana-msu 评论
可以通过相应地准备源来识别产生给定值的生成器:
async def main():
runs = [test_gen(i) for i in range(3)]
sources = [stream.map(xs, lambda x: (i, x)) for i, xs in enumerate(runs)]
async for i, x in stream.merge(*sources):
print(f'ID {i}: {x}')