Python 3.6: islice 的异步版本?
Python 3.6: async version of islice?
我正在尝试做这样的事情:
import asyncio
from itertools import islice
async def generate_numbers(n):
for x in range(n):
yield x
async def consume_numbers(n):
async for x in generate_numbers(n):
print(x)
async def consume_some_numbers(n,m):
async for x in islice(generate_numbers(n),m): #<-- This doesn't work. islice doesn't recognize async iterators as iterators.
print(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(consume_numbers(10))
loop.run_until_complete(consume_some_numbers(10,5))
有没有办法让它工作,或者至少获得类似的功能?
这里尝试实现 asyncio 友好 islice(和枚举):
import asyncio
import sys
import random
async def aenumerate(aiterable):
i = 0
async for x in aiterable:
yield i, x
i += 1
async def aislice(aiterable, *args):
s = slice(*args)
it = iter(range(s.start or 0, s.stop or sys.maxsize, s.step or 1))
try:
nexti = next(it)
except StopIteration:
return
async for i, element in aenumerate(aiterable):
if i == nexti:
yield element
try:
nexti = next(it)
except StopIteration:
return
async def generate_numbers(n):
for x in range(n):
await asyncio.sleep(random.uniform(0.1, 0.4))
yield x
async def consume_numbers(tag, n):
print(tag, "start")
async for x in generate_numbers(n):
print(tag, x)
print(tag, "done")
async def consume_some_numbers(tag, n, a, b, step=1):
print(tag, "start")
async for x in aislice(generate_numbers(n), a, b, step):
print(tag, x)
print(tag, "done")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
consume_numbers("A", 5),
consume_numbers("B", 10),
consume_some_numbers("C", 10, 0, 5),
consume_some_numbers("D", 30, 3, 20, 4),
consume_some_numbers("E", 10, 3, 8, 2),
]))
loop.close()
这未在真实世界的应用程序中测试,欢迎评论:-)
aiostream library provides generator-based operators for asynchronous iteration. See this example using stream.take:
import asyncio
from aiostream import stream
async def generate_numbers(n):
for x in range(n):
yield x
async def consume_some_numbers(n,m):
async for x in stream.take(generate_numbers(n), m):
print(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(consume_some_numbers(10, 5))
所有流运算符return 一个增强的异步可迭代对象,提供切片支持等额外功能。考虑以下示例:
import asyncio
from aiostream import stream
async def main():
xs = stream.count()
ys = xs[5:10:2]
async for y in ys:
print(y) # Prints 5, 7, 9
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在此 demonstration and the documentation 中查看更多示例。
我正在尝试做这样的事情:
import asyncio
from itertools import islice
async def generate_numbers(n):
for x in range(n):
yield x
async def consume_numbers(n):
async for x in generate_numbers(n):
print(x)
async def consume_some_numbers(n,m):
async for x in islice(generate_numbers(n),m): #<-- This doesn't work. islice doesn't recognize async iterators as iterators.
print(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(consume_numbers(10))
loop.run_until_complete(consume_some_numbers(10,5))
有没有办法让它工作,或者至少获得类似的功能?
这里尝试实现 asyncio 友好 islice(和枚举):
import asyncio
import sys
import random
async def aenumerate(aiterable):
i = 0
async for x in aiterable:
yield i, x
i += 1
async def aislice(aiterable, *args):
s = slice(*args)
it = iter(range(s.start or 0, s.stop or sys.maxsize, s.step or 1))
try:
nexti = next(it)
except StopIteration:
return
async for i, element in aenumerate(aiterable):
if i == nexti:
yield element
try:
nexti = next(it)
except StopIteration:
return
async def generate_numbers(n):
for x in range(n):
await asyncio.sleep(random.uniform(0.1, 0.4))
yield x
async def consume_numbers(tag, n):
print(tag, "start")
async for x in generate_numbers(n):
print(tag, x)
print(tag, "done")
async def consume_some_numbers(tag, n, a, b, step=1):
print(tag, "start")
async for x in aislice(generate_numbers(n), a, b, step):
print(tag, x)
print(tag, "done")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
consume_numbers("A", 5),
consume_numbers("B", 10),
consume_some_numbers("C", 10, 0, 5),
consume_some_numbers("D", 30, 3, 20, 4),
consume_some_numbers("E", 10, 3, 8, 2),
]))
loop.close()
这未在真实世界的应用程序中测试,欢迎评论:-)
aiostream library provides generator-based operators for asynchronous iteration. See this example using stream.take:
import asyncio
from aiostream import stream
async def generate_numbers(n):
for x in range(n):
yield x
async def consume_some_numbers(n,m):
async for x in stream.take(generate_numbers(n), m):
print(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(consume_some_numbers(10, 5))
所有流运算符return 一个增强的异步可迭代对象,提供切片支持等额外功能。考虑以下示例:
import asyncio
from aiostream import stream
async def main():
xs = stream.count()
ys = xs[5:10:2]
async for y in ys:
print(y) # Prints 5, 7, 9
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在此 demonstration and the documentation 中查看更多示例。