在没有 run_in_executor of asyncio.get_event_loop() 的异步方法中使用线程池

Using Threadpool in an Async method without run_in_executor of asyncio.get_event_loop()

以下是我的代码,它 运行 是使用 Thread Pool from Concurrent.Futures Package 的异步方法的长 IO 操作,如下所示:

# io_bound/threaded.py
import concurrent.futures as futures
import requests
import threading
import time
import asyncio

data = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]

def sleepy(n):
    time.sleep(n//2)
    return n*2

async def ExecuteSleep():
    l = len(data)
    results = []
    # Submit needs explicit mapping of I/p and O/p
    # Output may not be in the same Order
    with futures.ThreadPoolExecutor(max_workers=l) as executor:
        result_futures = {d:executor.submit(sleepy,d) for d in data}
        results = {d:result_futures[d].result() for d in data}
    
    return results

if __name__ == '__main__':
    print("Starting ...")
    t1 = time.time()
    result  = asyncio.run(ExecuteSleep())
    print(result)
    print("Finished ...")
    t2 = time.time()
    print(t2-t1)

以下是我的问题:

  1. 如果我不使用以下 asyncio api 直接 运行 Threadpool 可能会出现什么潜在问题:

    loop = asyncio.get_event_loop()
    loop.run_in_executor(...)
    

我已经查看了文档,运行 简单的测试用例对我来说这看起来非常好,它将 运行 使用自定义线程池在后台进行 IO 操作,如所列 here,我当然不能使用纯异步等待来接收输出,必须使用 map 或提交方法来管理调用,除此之外,我在这里看不到负数。

Ideone link 我的代码 https://ideone.com/lDVLFh

What could be the potential issue if I run the Threadpool directly

如果您只是将内容提交到您的线程池并且从不与其交互或等待结果,则没有问题。但是您的代码确实在等待结果¹。

问题是 ExecuteSleep 阻塞。虽然它被定义为 async def,但它在名称上是异步的,只是因为它不等待任何东西。虽然它 运行s,但没有其他 asyncio 协程可以 运行,因此它破坏了 asyncio 的主要优势,即同时 运行 多个协程。


¹ 即使您删除了对 `result()` 的调用,`with` 语句也会等待工作人员完成他们的工作以便能够终止他们。如果您希望同步功能 运行 完全在后台进行,您可以将池设为全局池,而不使用“with”来管理它。