如何以安全的方式从异步函数调用同步函数

How to call synchronous function(s) from async functions in safe manner

如果一名或多名工人同时呼叫 'Synchronous function' 会发生什么情况? 也许一个或多个工人暂时被封锁?

async def worker(queue):
    while True:
        queue_out  = await queue.get()
        file_name = queue_out.file.name
        # Create path + file_name 
        destination_path = create_path(file_name) #<-- SYNC function

        await download_medical(queue_out,destination_path)

async def main():
    queue_in = asyncio.Queue(1)
    workers = [asyncio.create_task(worker(queue_in)) for _ in range(5)]
        
    async for result in get_result(building):
        await queue_in.put(result) 

def create_path(file_name):
    #....#
    #operations related to file and folder on the hdd
    #creates a folder based on file name

简答:

  • 如果您从异步协程中调用同步(阻塞)函数,则循环中并发 运行 的所有任务都将停止,直到该函数 returns.
  • 在另一个线程或子进程中使用 loop.run_in_executor(...) 异步 运行 阻塞函数。
async def worker(queue):
    loop = Asyncio.get_event_loop()  # get a handle to the current run loop
    while True:
        queue_out  = await queue.get()
        file_name = queue_out.file.name
        
        # run blocking function in an executor
        create_path_task = loop.run_in_executor(None, create_path, file_name)
        destination_path = await create_path_task  # wait for this task to finish

        await download_medical(queue_out, destination_path)

背景: 请注意,异步函数(协程)不会 运行 并行执行任务,它们 运行 并发执行,这可能会同时 运行 出现。考虑这一点的最简单方法是意识到每次 await 被调用时,即在等待结果时,事件循环将暂停当前 运行ning 协程和 运行 另一个协程,直到等待某事等等;因此使其协同并发。

通常在 IO 操作上进行等待,因为它们很耗时并且不 cpu 密集。 CPU 密集操作将阻塞循环直到它完成。另请注意,常规 IO 操作本质上是阻塞的,如果您想从并发中受益,那么您必须使用 Asyncio 兼容库,如 aiofile、aiohttp 等。

关于执行者的更多信息: 运行 定期同步功能而不阻塞事件循环的最简单方法是使用 loop.run_in_executor。第一个参数从 concurrent.futures 模块中获取一个执行器,如 ThreadPoolExecutorProcessPoolExecutor。通过传递 None,Asyncio 将在默认 ThreadPoolExecutor 中自动 运行 您的函数。如果您的任务是 cpu 密集型任务,请使用 ProcessPoolExecutor 以便它可以真正并行地使用多个 cpu 核心和 运行。