使用 asyncio 并行工作的正确方法
Right way to parallelize work with asyncio
SO 上有很多帖子询问有关 asyncio 的具体问题,但我无法掌握在给定情况下使用什么的正确方法。
假设我想并行解析和抓取多个网页。我可以使用 asyncio 以至少 3 种不同的方式执行此操作:
与 pool.submit
:
with ThreadPoolExecutor(max_workers=10) as pool:
result_futures = list(map(lambda x: pool.submit(my_func, x), my_list))
for future in as_completed(result_futures):
results.append(future.result())
return results
与asyncio.gather
:
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=10) as pool:
futures = [loop.run_in_executor(pool, my_func, x) for x in my_list]
results = await asyncio.gather(*futures)
只有 pool.map
:
with ThreadPoolExecutor(max_workers=10) as pool:
results = [x for x in pool.map(my_func, arg_list)]
my_func
类似于
async def my_func(arg):
async with aiohttp.ClientSession() as session:
async with session.post(...):
...
有人可以帮助我了解这 3 种方法之间的区别吗?我知道我可以,例如,在第一个中独立处理异常,但还有其他区别吗?
None 个。 ThreadPoolExecutor
和 run_in_executor
都会在另一个线程中执行您的代码,无论您使用 asyncio 循环来监视它们的执行。到那时,您可能根本不使用 asyncio:async 的想法恰恰是在单个线程上管理 运行 一切 - 获得一些 CPU 周期并在竞争条件下缓解很多出现在多线程代码上。
如果你的 my_func
一直正确使用 async(看起来是这样,但代码不完整),你必须为每次调用创建一个 asyncio Task
“异步定义”功能。在那一点上,也许最短路径确实是使用 asyncio.gather
:
import asyncio
import aiohttp, ... # things used inside "my_func"
def my_func(x):
...
my_list = ...
results = asyncio.run(asyncio.gather(*(my_func(x) for x in my_list)))
仅此而已。
现在回到您的代码,检查差异:
您的代码几乎是偶然工作的,例如,您实际上只是将异步函数及其参数传递给线程池执行程序:以这种方式调用任何异步函数时,它们立即 return,没有完成任何工作。这意味着在您的线程池执行程序中没有执行任何内容(但一些用于创建协同例程的薄样板内部代码)。由目标线程中 运行 的调用(即实际的 my_func(x)
调用)编辑的值 return 是“协程”:这些是要等待的对象在主线程中,这将实际执行网络 I/O。那就是:你的“my_func”是一个“协同例程函数”,当调用它时立即用一个“协同例程对象”返回运行s。当协程对象被等待时,“my_func”中的代码被实际执行。
现在,把它排除在外:在你的第一个代码片段中,你在 concurrent.futures 未来调用 future.result
:那只会给你协程对象:该代码不起作用- 如果你写 results.append(await future.result())
那么,是的,如果执行中没有异常,它会工作,但会按顺序进行所有调用:“await”停止当前线程的执行,直到等待的对象解决,并且由于等待其他结果会发生在同一代码中,因此它们将排队并按顺序执行,并行度为零。
您的 pool.map
代码做同样的事情,而您的 asyncio.gather
代码以不同的方式出错:loop.run_in_executor
代码将接听您的电话并 运行另一个线程,并为您提供一个适合与 gather
一起使用的可等待对象。但是,等待它 return 您将成为“协程对象”,而不是 HTTP 调用的结果。
关于在并行代码中引发异常的真正选择是使用 asyncio.gather
、asyncio.wait
或 asyncio.as_completed
。在此处查看文档:https://docs.python.org/3/library/asyncio-task.html
SO 上有很多帖子询问有关 asyncio 的具体问题,但我无法掌握在给定情况下使用什么的正确方法。
假设我想并行解析和抓取多个网页。我可以使用 asyncio 以至少 3 种不同的方式执行此操作:
与 pool.submit
:
with ThreadPoolExecutor(max_workers=10) as pool:
result_futures = list(map(lambda x: pool.submit(my_func, x), my_list))
for future in as_completed(result_futures):
results.append(future.result())
return results
与asyncio.gather
:
loop = asyncio.get_running_loop()
with ThreadPoolExecutor(max_workers=10) as pool:
futures = [loop.run_in_executor(pool, my_func, x) for x in my_list]
results = await asyncio.gather(*futures)
只有 pool.map
:
with ThreadPoolExecutor(max_workers=10) as pool:
results = [x for x in pool.map(my_func, arg_list)]
my_func
类似于
async def my_func(arg):
async with aiohttp.ClientSession() as session:
async with session.post(...):
...
有人可以帮助我了解这 3 种方法之间的区别吗?我知道我可以,例如,在第一个中独立处理异常,但还有其他区别吗?
None 个。 ThreadPoolExecutor
和 run_in_executor
都会在另一个线程中执行您的代码,无论您使用 asyncio 循环来监视它们的执行。到那时,您可能根本不使用 asyncio:async 的想法恰恰是在单个线程上管理 运行 一切 - 获得一些 CPU 周期并在竞争条件下缓解很多出现在多线程代码上。
如果你的 my_func
一直正确使用 async(看起来是这样,但代码不完整),你必须为每次调用创建一个 asyncio Task
“异步定义”功能。在那一点上,也许最短路径确实是使用 asyncio.gather
:
import asyncio
import aiohttp, ... # things used inside "my_func"
def my_func(x):
...
my_list = ...
results = asyncio.run(asyncio.gather(*(my_func(x) for x in my_list)))
仅此而已。
现在回到您的代码,检查差异:
您的代码几乎是偶然工作的,例如,您实际上只是将异步函数及其参数传递给线程池执行程序:以这种方式调用任何异步函数时,它们立即 return,没有完成任何工作。这意味着在您的线程池执行程序中没有执行任何内容(但一些用于创建协同例程的薄样板内部代码)。由目标线程中 运行 的调用(即实际的 my_func(x)
调用)编辑的值 return 是“协程”:这些是要等待的对象在主线程中,这将实际执行网络 I/O。那就是:你的“my_func”是一个“协同例程函数”,当调用它时立即用一个“协同例程对象”返回运行s。当协程对象被等待时,“my_func”中的代码被实际执行。
现在,把它排除在外:在你的第一个代码片段中,你在 concurrent.futures 未来调用 future.result
:那只会给你协程对象:该代码不起作用- 如果你写 results.append(await future.result())
那么,是的,如果执行中没有异常,它会工作,但会按顺序进行所有调用:“await”停止当前线程的执行,直到等待的对象解决,并且由于等待其他结果会发生在同一代码中,因此它们将排队并按顺序执行,并行度为零。
您的 pool.map
代码做同样的事情,而您的 asyncio.gather
代码以不同的方式出错:loop.run_in_executor
代码将接听您的电话并 运行另一个线程,并为您提供一个适合与 gather
一起使用的可等待对象。但是,等待它 return 您将成为“协程对象”,而不是 HTTP 调用的结果。
关于在并行代码中引发异常的真正选择是使用 asyncio.gather
、asyncio.wait
或 asyncio.as_completed
。在此处查看文档:https://docs.python.org/3/library/asyncio-task.html