使用 asyncio 并行工作的正确方法

Right way to parallelize work with asyncio

SO 上有很多帖子询问有关 asyncio 的具体问题,但我无法掌握在给定情况下使用什么的正确方法。

假设我想并行解析和抓取多个网页。我可以使用 asyncio 以至少 3 种不同的方式执行此操作:

pool.submit:

with ThreadPoolExecutor(max_workers=10) as pool:
    result_futures = list(map(lambda x: pool.submit(my_func, x), my_list))
    for future in as_completed(result_futures):
        results.append(future.result())
return results

asyncio.gather:

loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=10) as pool:
     futures = [loop.run_in_executor(pool, my_func, x) for x in my_list]
     results = await asyncio.gather(*futures)

只有 pool.map:

with ThreadPoolExecutor(max_workers=10) as pool:
     results = [x for x in pool.map(my_func, arg_list)]

my_func 类似于

async def my_func(arg):
    async with aiohttp.ClientSession() as session:
        async with session.post(...):
            ...

有人可以帮助我了解这 3 种方法之间的区别吗?我知道我可以,例如,在第一个中独立处理异常,但还有其他区别吗?

None 个。 ThreadPoolExecutorrun_in_executor 都会在另一个线程中执行您的代码,无论您使用 asyncio 循环来监视它们的执行。到那时,您可能根本不使用 asyncio:async 的想法恰恰是在单个线程上管理 运行 一切 - 获得一些 CPU 周期并在竞争条件下缓解很多出现在多线程代码上。

如果你的 my_func 一直正确使用 async(看起来是这样,但代码不完整),你必须为每次调用创建一个 asyncio Task “异步定义”功能。在那一点上,也许最短路径确实是使用 asyncio.gather:

import asyncio
import aiohttp, ... # things used inside "my_func"

def my_func(x):
    ...

my_list = ...

results = asyncio.run(asyncio.gather(*(my_func(x) for x in my_list)))

仅此而已。

现在回到您的代码,检查差异: 您的代码几乎是偶然工作的,例如,您实际上只是将异步函数及其参数传递给线程池执行程序:以这种方式调用任何异步函数时,它们立即 return,没有完成任何工作。这意味着在您的线程池执行程序中没有执行任何内容(但一些用于创建协同例程的薄样板内部代码)。由目标线程中 运行 的调用(即实际的 my_func(x) 调用)编辑的值 return 是“协程”:这些是要等待的对象在主线程中,这将实际执行网络 I/O。那就是:你的“my_func”是一个“协同例程函数”,当调用它时立即用一个“协同例程对象”返回运行s。当协程对象被等待时,“my_func”中的代码被实际执行。

现在,把它排除在外:在你的第一个代码片段中,你在 concurrent.futures 未来调用 future.result:那只会给你协程对象:该代码不起作用- 如果你写 results.append(await future.result()) 那么,是的,如果执行中没有异常,它会工作,但会按顺序进行所有调用:“await”停止当前线程的执行,直到等待的对象解决,并且由于等待其他结果会发生在同一代码中,因此它们将排队并按顺序执行,并行度为零。

您的 pool.map 代码做同样的事情,而您的 asyncio.gather 代码以不同的方式出错:loop.run_in_executor 代码将接听您的电话并 运行另一个线程,并为您提供一个适合与 gather 一起使用的可等待对象。但是,等待它 return 您将成为“协程对象”,而不是 HTTP 调用的结果。

关于在并行代码中引发异常的真正选择是使用 asyncio.gatherasyncio.waitasyncio.as_completed。在此处查看文档:https://docs.python.org/3/library/asyncio-task.html