Python 协程中的并行异步 IO

Parallel asynchronous IO in Python's coroutines

简单示例:我需要并行发出两个不相关的 HTTP 请求。最简单的方法是什么?我希望它是这样的:

async def do_the_job():
    with aiohttp.ClientSession() as session:
        coro_1 = session.get('http://httpbin.org/get')
        coro_2 = session.get('http://httpbin.org/ip')
        return combine_responses(await coro_1, await coro_2)

换句话说,我想启动 IO 操作并等待它们的结果,以便它们有效地 运行 并行。这可以通过 asyncio.gather:

来实现
async def do_the_job():
    with aiohttp.ClientSession() as session:
        coro_1 = session.get('http://example.com/get')
        coro_2 = session.get('http://example.org/tp')
        return combine_responses(*(await asyncio.gather(coro_1, coro_2)))

接下来,我想要一些复杂的依赖结构。我想在具备所有先决条件时开始操作,并在需要结果时获得结果。这里帮助 asyncio.ensure_future 将任务与事件循环单独管理的协程分开:

async def do_the_job():
    with aiohttp.ClientSession() as session:
        fut_1 = asyncio.ensure_future(session.get('http://httpbin.org/ip'))
        coro_2 = session.get('http://httpbin.org/get')
        coro_3 = session.post('http://httpbin.org/post', data=(await coro_2)
        coro_3_result = await coro_3
        return combine_responses(await fut_1, coro_3_result)

为了在我的逻辑流程中使用协程实现并行非阻塞IO,我必须使用asyncio.ensure_futureasyncio.gather(实际上使用asyncio.ensure_future)是真的吗?有没有比较少的"verbose"方法?

通常开发人员必须考虑哪些协程应该成为单独的任务并使用上述功能来获得最佳性能,这是真的吗?

在事件循环中使用没有多个任务的协程是否有意义?

现实生活中的事件循环任务如何"heavy"?当然,它们是 "lighter" 而不是 OS 线程或进程。我应该在多大程度上争取尽可能少地完成此类任务?

I need to make two unrelated HTTP requests in parallel. What's the simplest way to do that?

import asyncio
import aiohttp


async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def main():
    results = await asyncio.gather(
        request('http://httpbin.org/delay/1'),
        request('http://httpbin.org/delay/1'),
    )
    print(len(results))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

是的,您可以使用 asyncio.gather 实现并发或使用 asyncio.ensure_future 创建任务。

Next, I want to have some complex dependency structure? I want to start operations when I have all prerequisites for them and get results when I need the results.

虽然您提供的代码可以完成工作,但最好将并发流拆分到不同的协同程序上并再次使用 asyncio.gather:

import asyncio
import aiohttp


async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def get_ip():
    return await request('http://httpbin.org/ip')


async def post_from_get():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get') as resp:
            get_res = await resp.text()
        async with session.post('http://httpbin.org/post', data=get_res) as resp:
            return await resp.text()


async def main():
    results = await asyncio.gather(
        get_ip(),
        post_from_get(),
    )
    print(len(results))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

Is it true that normally developers have to think what coroutines should become separate tasks and use aforementioned functions to gain optimal performance?

由于您使用 asyncio,您可能希望 运行 同时执行一些作业以提高性能,对吧? asyncio.gather 是一种表达方式 - "run these jobs concurrently to get their results faster".

如果您不必考虑应该 运行 同时执行哪些作业来获得性能,您可能可以使用简单的同步代码。

Is there a point in using coroutines without multiple tasks in event loop?

在您的代码中,如果不需要,您不必手动创建任务:此答案中的两个片段都不使用 asyncio.ensure_future。但在内部 asyncio 不断使用任务(例如,正如您所说 asyncio.gather 使用任务本身)。

How "heavy" are event loop tasks in real life? Surely, they're "lighter" than OS threads or processes. To what extent should I strive for minimal possible number of such tasks?

异步程序的主要瓶颈(几乎总是)网络:您根本不必担心异步程序的数量 coroutines/tasks。