异步 python itertools 链接多个生成器
asynchronous python itertools chain multiple generators
为清楚起见更新问题:
假设我有 2 个处理生成器函数:
def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case
def gen2():
yield 4
yield 5
yield 6
我可以用 itertools 链接它们
from itertools import chain
mix = chain(gen1(), gen2())
然后我可以用它创建另一个生成器函数对象,
def mix_yield():
for item in mix:
yield item
或者如果我只想next(mix)
,它就在那里。
我的问题是,我怎样才能在异步代码中做同样的事情?
因为我需要它:
- return 在 yield 中(一个接一个),或者使用
next
迭代器
- 最快解决收益优先(异步)
上一条。更新:
经过试验和研究,我发现 aiostream 库声明为 itertools 的异步版本,所以我做了:
import asyncio
from aiostream import stream
async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3
async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6
a_mix = stream.combine.merge(gen1(),gen2())
async def a_mix_yield():
for item in a_mix:
yield item
可是我还是做不到next(a_mix)
TypeError: 'merge' object is not an iterator
或next(await a_mix)
raise StreamEmpty()
虽然我仍然可以将它列为列表:
print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]
所以一个目标已经完成,还有一个目标要完成:
return 在 yield 中(一个接一个),或者使用 next
迭代器
- 最快解决的收益率优先(异步)
Python 的 next
内置函数只是调用对象基础 __next__
方法的一种便捷方式。 __next__
的异步等价物是异步迭代器上的 __anext__
方法。没有 anext
全局函数,但可以很容易地编写它:
async def anext(aiterator):
return await aiterator.__anext__()
但是节省的空间很小,在极少数情况下需要这样做时,还不如直接调用 __anext__
。异步迭代器又通过调用 __aiter__
从异步 iterable 获得(类似于常规迭代提供的 __iter__
)。手动驱动的异步迭代如下所示:
a_iterator = obj.__aiter__() # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...
当没有更多元素可用时,__anext__
将引发 StopAsyncIteration
。要遍历异步迭代器,应该使用 async for
.
这是一个可运行的示例,基于您的代码,同时使用 __anext__
和 async for
来耗尽使用 aiostream.stream.combine.merge
:
设置的流
async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)
asyncio.get_event_loop().run_until_complete(main())
我看到了这个答案并查看了 aiostream 库。这是我想出的合并多个异步生成器的代码。它不使用任何库。
async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
pending = gens.copy()
pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
while len(pending_tasks) > 0:
done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
for d in done:
try:
result = d.result()
yield result
dg = pending_tasks[d]
pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
except StopAsyncIteration as sai:
print("Exception in getting result", sai)
finally:
del pending_tasks[d]
希望这对您有所帮助,如果其中有任何错误,请告诉我。
为清楚起见更新问题:
假设我有 2 个处理生成器函数:
def gen1(): # just for examples,
yield 1 # yields actually carry
yield 2 # different computation weight
yield 3 # in my case
def gen2():
yield 4
yield 5
yield 6
我可以用 itertools 链接它们
from itertools import chain
mix = chain(gen1(), gen2())
然后我可以用它创建另一个生成器函数对象,
def mix_yield():
for item in mix:
yield item
或者如果我只想next(mix)
,它就在那里。
我的问题是,我怎样才能在异步代码中做同样的事情?
因为我需要它:
- return 在 yield 中(一个接一个),或者使用
next
迭代器 - 最快解决收益优先(异步)
上一条。更新:
经过试验和研究,我发现 aiostream 库声明为 itertools 的异步版本,所以我做了:
import asyncio
from aiostream import stream
async def gen1():
await asyncio.sleep(0)
yield 1
await asyncio.sleep(0)
yield 2
await asyncio.sleep(0)
yield 3
async def gen2():
await asyncio.sleep(0)
yield 4
await asyncio.sleep(0)
yield 5
await asyncio.sleep(0)
yield 6
a_mix = stream.combine.merge(gen1(),gen2())
async def a_mix_yield():
for item in a_mix:
yield item
可是我还是做不到next(a_mix)
TypeError: 'merge' object is not an iterator
或next(await a_mix)
raise StreamEmpty()
虽然我仍然可以将它列为列表:
print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]
所以一个目标已经完成,还有一个目标要完成:
return 在 yield 中(一个接一个),或者使用
next
迭代器- 最快解决的收益率优先(异步)
Python 的 next
内置函数只是调用对象基础 __next__
方法的一种便捷方式。 __next__
的异步等价物是异步迭代器上的 __anext__
方法。没有 anext
全局函数,但可以很容易地编写它:
async def anext(aiterator):
return await aiterator.__anext__()
但是节省的空间很小,在极少数情况下需要这样做时,还不如直接调用 __anext__
。异步迭代器又通过调用 __aiter__
从异步 iterable 获得(类似于常规迭代提供的 __iter__
)。手动驱动的异步迭代如下所示:
a_iterator = obj.__aiter__() # regular method
elem1 = await a_iterator.__anext__() # async method
elem2 = await a_iterator.__anext__() # async method
...
当没有更多元素可用时,__anext__
将引发 StopAsyncIteration
。要遍历异步迭代器,应该使用 async for
.
这是一个可运行的示例,基于您的代码,同时使用 __anext__
和 async for
来耗尽使用 aiostream.stream.combine.merge
:
async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)
asyncio.get_event_loop().run_until_complete(main())
我看到了这个答案并查看了 aiostream 库。这是我想出的合并多个异步生成器的代码。它不使用任何库。
async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
pending = gens.copy()
pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
while len(pending_tasks) > 0:
done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
for d in done:
try:
result = d.result()
yield result
dg = pending_tasks[d]
pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
except StopAsyncIteration as sai:
print("Exception in getting result", sai)
finally:
del pending_tasks[d]
希望这对您有所帮助,如果其中有任何错误,请告诉我。