Python 3.6: islice 的异步版本?

Python 3.6: async version of islice?

我正在尝试做这样的事情:

import asyncio
from itertools import islice

async def generate_numbers(n):
    for x in range(n):
        yield x


async def consume_numbers(n):
    async for x in generate_numbers(n):
        print(x)

async def consume_some_numbers(n,m):
    async for x in islice(generate_numbers(n),m): #<-- This doesn't work.  islice doesn't recognize async iterators as iterators.
        print(x)


loop = asyncio.get_event_loop()
loop.run_until_complete(consume_numbers(10))
loop.run_until_complete(consume_some_numbers(10,5))

有没有办法让它工作,或者至少获得类似的功能?

这里尝试实现 asyncio 友好 islice(和枚举):

import asyncio
import sys

import random


async def aenumerate(aiterable):
    i = 0
    async for x in aiterable:
        yield i, x
        i += 1


async def aislice(aiterable, *args):
    s = slice(*args)
    it = iter(range(s.start or 0, s.stop or sys.maxsize, s.step or 1))
    try:
        nexti = next(it)
    except StopIteration:
        return
    async for i, element in aenumerate(aiterable):
        if i == nexti:
            yield element
            try:
                nexti = next(it)
            except StopIteration:
                return


async def generate_numbers(n):
    for x in range(n):
        await asyncio.sleep(random.uniform(0.1, 0.4))
        yield x


async def consume_numbers(tag, n):
    print(tag, "start")
    async for x in generate_numbers(n):
        print(tag, x)
    print(tag, "done")


async def consume_some_numbers(tag, n, a, b, step=1):
    print(tag, "start")
    async for x in aislice(generate_numbers(n), a, b, step):
        print(tag, x)
    print(tag, "done")


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
    consume_numbers("A", 5),
    consume_numbers("B", 10),
    consume_some_numbers("C", 10, 0, 5),
    consume_some_numbers("D", 30, 3, 20, 4),
    consume_some_numbers("E", 10, 3, 8, 2),
]))
loop.close()

在真实世界的应用程序中测试,欢迎评论:-)

aiostream library provides generator-based operators for asynchronous iteration. See this example using stream.take:

import asyncio
from aiostream import stream

async def generate_numbers(n):
    for x in range(n):
        yield x

async def consume_some_numbers(n,m):
    async for x in stream.take(generate_numbers(n), m):
        print(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(consume_some_numbers(10, 5))

所有流运算符return 一个增强的异步可迭代对象,提供切片支持等额外功能。考虑以下示例:

import asyncio
from aiostream import stream

async def main():
    xs = stream.count()
    ys = xs[5:10:2]
    async for y in ys:
        print(y)  # Prints 5, 7, 9

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

在此 demonstration and the documentation 中查看更多示例。