如何在 python 3.5+ 中将异步生成器合并到普通生成器中

How to merge async generators into a vanilla generator in python 3.5+

我在组合异步生成器和实际上 运行ning 它们时遇到了问题。这是因为我发现 运行 它们的唯一方法是通过一个 returns 可迭代的事件循环而不是生成器。让我用一个简单的例子来说明这一点:

假设我有一个函数 google_search,它通过抓取来搜索 google(我不是故意使用 API)。它接收一个搜索字符串和 returns 一个搜索结果生成器。这个生成器不会在页面结束时结束,函数会继续到下一页。因此 google_search 函数 returns 可能是一个几乎无穷无尽的生成器(从技术上讲它总是会结束,但通常你可以在 google 上搜索数百万次)

def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup
    # to parse the resulting html and yield search results
    # Assume this function works
    ......

好的,所以现在我想创建一个函数来迭代多个 google_search 生成器。我想要这样的东西:

def google_searches(*search_strings):
    for results in zip(google_search(query) for query in search_strings):
        yield results

这样我就可以使用一个简单的 for 循环展开 google_searches 并得到我的结果。上面的代码运行良好,但对于任何相当大数量的搜索来说都非常慢。该代码发送第一次搜索请求,然后是第二次搜索,依此类推,直到最后产生结果。我想加快速度(很多)。我的第一个想法是将 google_searches 更改为异步函数(我正在使用 python 3.6.3 并且可以使用 await/async 等)。然后这会创建一个异步生成器,它很好,但我只能在另一个异步函数或事件循环中 运行 它。并且 运行 在事件循环中使用 run_until_complete(loop.gather(...)) returns 结果列表而不是普通生成器,这违背了目的可能有太多的搜索结果无法保存在一个列表中。

如何通过异步执行请求,同时仍然是普通生成器来使 google_searches 函数更快(最好使用异步代码,但欢迎使用任何代码)? 提前致谢!

def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup

这是普通的同步发电机。您可以在其中使用 requests,但如果您想使用异步 aiohttp,则需要使用 async def.

定义的 asynchronous generator

迭代多个异步生成器的结果更有趣。您不能使用普通的 zip 因为它适用于普通的可迭代对象,而不是异步可迭代对象。所以你应该实现你自己的(这也将支持并发迭代)。

我制作了一个小原型,我认为它可以满足您的需求:

import asyncio
import aiohttp
import time


# async versions of some builtins:
async def anext(aiterator):
    try:
        return await aiterator.__anext__()
    except StopAsyncIteration as exc:
        raise exc


def aiter(aiterable):
    return aiterable.__aiter__()


async def azip(*iterables):
    iterators = [aiter(it) for it in iterables]
    while iterators:
        results = await asyncio.gather(
            *[anext(it) for it in iterators],
            return_exceptions=True,
        )
        yield tuple(results)


# emulating grabbing:
async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def google_search(search_string):
    for i in range(999):  # big async generator
        url = 'http://httpbin.org/delay/{}'.format(i)  # increase delay to better see concurency
        j = await request(url)
        yield search_string + ' ' + str(i)


async def google_searches(*search_strings):
    async for results in azip(*[google_search(s) for s in search_strings]):
        for result in results:
            yield result


# test it works:
async def main():
    async for result in google_searches('first', 'second', 'third'):
        print(result, int(time.time()))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

输出:

first 0 1514759561
second 0 1514759561
third 0 1514759561
first 1 1514759562
second 1 1514759562
third 1 1514759562
first 2 1514759564
second 2 1514759564
third 2 1514759564
first 3 1514759567
second 3 1514759567
third 3 1514759567

时间显示不同的搜索 运行 同时发生。

接受的答案在再次调用生成器之前等待每个异步生成器的一个结果。如果数据没有以完全相同的速度出现,那可能是个问题。下面的解决方案采用多个异步迭代器(生成器或非生成器)并在多个协程中同时迭代所有这些迭代器。每个协程将结果放在一个asyncio.Queue中,然后由客户端代码迭代:

迭代器代码:

import asyncio
from async_timeout import timeout

class MergeAsyncIterator:
    def __init__(self, *it, timeout=60, maxsize=0):
        self._it = [self.iter_coro(i) for i in it]
        self.timeout = timeout
        self._futures = []
        self._queue = asyncio.Queue(maxsize=maxsize)

    def __aiter__(self):
        for it in self._it:
            f = asyncio.ensure_future(it)
            self._futures.append(f)
        return self

    async def __anext__(self):
        if all(f.done() for f in self._futures) and self._queue.empty():
            raise StopAsyncIteration
        with timeout(self.timeout):
            try:
                return await self._queue.get()
            except asyncio.CancelledError:
                raise StopAsyncIteration

    def iter_coro(self, it):
        if not hasattr(it, '__aiter__'):
            raise ValueError('Object passed must be an AsyncIterable')
        return self.aiter_to_queue(it)

    async def aiter_to_queue(self, ait):
        async for i in ait:
            await self._queue.put(i)
            await asyncio.sleep(0)

示例客户端代码:

import random
import asyncio
from datetime import datetime

async def myaiter(name):
    for i in range(5):
        n = random.randint(0, 3)
        await asyncio.sleep(0.1 + n)
        yield (name, n)
    yield (name, 'DONE')

async def main():
    aiters = [myaiter(i) for i in 'abc']
    async for i in MergeAsyncIterator(*aiters, timeout=3):
        print(datetime.now().strftime('%H:%M:%S.%f'), i)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

输出:

14:48:28.638975 ('a', 1)
14:48:29.638822 ('b', 2)
14:48:29.741651 ('b', 0)
14:48:29.742013 ('a', 1)
14:48:30.639588 ('c', 3)
14:48:31.742705 ('c', 1)
14:48:31.847440 ('b', 2)
14:48:31.847828 ('a', 2)
14:48:31.847960 ('c', 0)
14:48:32.950166 ('c', 1)
14:48:33.948791 ('a', 2)
14:48:34.949339 ('b', 3)
14:48:35.055487 ('c', 2)
14:48:35.055928 ('c', 'DONE')
14:48:36.049977 ('a', 2)
14:48:36.050481 ('a', 'DONE')
14:48:37.050415 ('b', 2)
14:48:37.050966 ('b', 'DONE')

PS:上面的代码使用了async_timeoutthird-party库。
PS2:aiostream 库与上述代码的功能相同,而且更多。

我只是将我刚才编写的解决方案粘贴到这里,因为我总是以这个问题结束只是为了记住我之前已经解决了这个问题。

async def iterator_merge(iterators: typing.Dict[typing.AsyncIterator, typing.Optional[asyncio.Future]]):
while iterators:
    for iterator, value in list(iterators.items()):
        if not value:
            iterators[iterator] = asyncio.ensure_future(iterator.__anext__())

    tasks, _ = await asyncio.wait(iterators.values(), return_when=asyncio.FIRST_COMPLETED)
    for task in tasks:
        # We send the result up
        try:
            res = task.result()
            yield res
        except StopAsyncIteration:
            # We remove the task from the list
            for it, old_next in list(iterators.items()):
                if task is old_next:
                    logger.debug(f'Iterator {it} finished consuming')
                    iterators.pop(it)
        else:
            # We remove the task from the key
            for it, old_next in list(iterators.items()):
                if task is old_next:
                    iterators[it] = None

它有打字注释,但我认为这是一个很好的解决方案。它意味着用你的异步生成器作为键来调用,如果你有任何等待的话,还有一个未来。

iterators = {
    k8s_stream_pod_log(name=name): None,
    k8s_stream_pod_events(name=name): None,
}

你可以在 github.com/txomon/abot.

中找到我的使用方法