在没有 run_in_executor of asyncio.get_event_loop() 的异步方法中使用线程池
Using Threadpool in an Async method without run_in_executor of asyncio.get_event_loop()
以下是我的代码,它 运行 是使用 Thread Pool from Concurrent.Futures Package
的异步方法的长 IO 操作,如下所示:
# io_bound/threaded.py
import concurrent.futures as futures
import requests
import threading
import time
import asyncio
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
def sleepy(n):
time.sleep(n//2)
return n*2
async def ExecuteSleep():
l = len(data)
results = []
# Submit needs explicit mapping of I/p and O/p
# Output may not be in the same Order
with futures.ThreadPoolExecutor(max_workers=l) as executor:
result_futures = {d:executor.submit(sleepy,d) for d in data}
results = {d:result_futures[d].result() for d in data}
return results
if __name__ == '__main__':
print("Starting ...")
t1 = time.time()
result = asyncio.run(ExecuteSleep())
print(result)
print("Finished ...")
t2 = time.time()
print(t2-t1)
以下是我的问题:
如果我不使用以下 asyncio
api 直接 运行 Threadpool 可能会出现什么潜在问题:
loop = asyncio.get_event_loop()
loop.run_in_executor(...)
我已经查看了文档,运行 简单的测试用例对我来说这看起来非常好,它将 运行 使用自定义线程池在后台进行 IO 操作,如所列 here,我当然不能使用纯异步等待来接收输出,必须使用 map 或提交方法来管理调用,除此之外,我在这里看不到负数。
Ideone link 我的代码 https://ideone.com/lDVLFh
What could be the potential issue if I run the Threadpool directly
如果您只是将内容提交到您的线程池并且从不与其交互或等待结果,则没有问题。但是您的代码确实在等待结果¹。
问题是 ExecuteSleep
阻塞。虽然它被定义为 async def
,但它在名称上是异步的,只是因为它不等待任何东西。虽然它 运行s,但没有其他 asyncio 协程可以 运行,因此它破坏了 asyncio 的主要优势,即同时 运行 多个协程。
¹ 即使您删除了对 `result()` 的调用,`with` 语句也会等待工作人员完成他们的工作以便能够终止他们。如果您希望同步功能 运行 完全在后台进行,您可以将池设为全局池,而不使用“with”来管理它。
以下是我的代码,它 运行 是使用 Thread Pool from Concurrent.Futures Package
的异步方法的长 IO 操作,如下所示:
# io_bound/threaded.py
import concurrent.futures as futures
import requests
import threading
import time
import asyncio
data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
def sleepy(n):
time.sleep(n//2)
return n*2
async def ExecuteSleep():
l = len(data)
results = []
# Submit needs explicit mapping of I/p and O/p
# Output may not be in the same Order
with futures.ThreadPoolExecutor(max_workers=l) as executor:
result_futures = {d:executor.submit(sleepy,d) for d in data}
results = {d:result_futures[d].result() for d in data}
return results
if __name__ == '__main__':
print("Starting ...")
t1 = time.time()
result = asyncio.run(ExecuteSleep())
print(result)
print("Finished ...")
t2 = time.time()
print(t2-t1)
以下是我的问题:
如果我不使用以下
asyncio
api 直接 运行 Threadpool 可能会出现什么潜在问题:loop = asyncio.get_event_loop() loop.run_in_executor(...)
我已经查看了文档,运行 简单的测试用例对我来说这看起来非常好,它将 运行 使用自定义线程池在后台进行 IO 操作,如所列 here,我当然不能使用纯异步等待来接收输出,必须使用 map 或提交方法来管理调用,除此之外,我在这里看不到负数。
Ideone link 我的代码 https://ideone.com/lDVLFh
What could be the potential issue if I run the Threadpool directly
如果您只是将内容提交到您的线程池并且从不与其交互或等待结果,则没有问题。但是您的代码确实在等待结果¹。
问题是 ExecuteSleep
阻塞。虽然它被定义为 async def
,但它在名称上是异步的,只是因为它不等待任何东西。虽然它 运行s,但没有其他 asyncio 协程可以 运行,因此它破坏了 asyncio 的主要优势,即同时 运行 多个协程。
¹ 即使您删除了对 `result()` 的调用,`with` 语句也会等待工作人员完成他们的工作以便能够终止他们。如果您希望同步功能 运行 完全在后台进行,您可以将池设为全局池,而不使用“with”来管理它。