如何以安全的方式从异步函数调用同步函数
How to call synchronous function(s) from async functions in safe manner
如果一名或多名工人同时呼叫 'Synchronous function' 会发生什么情况?
也许一个或多个工人暂时被封锁?
async def worker(queue):
while True:
queue_out = await queue.get()
file_name = queue_out.file.name
# Create path + file_name
destination_path = create_path(file_name) #<-- SYNC function
await download_medical(queue_out,destination_path)
async def main():
queue_in = asyncio.Queue(1)
workers = [asyncio.create_task(worker(queue_in)) for _ in range(5)]
async for result in get_result(building):
await queue_in.put(result)
def create_path(file_name):
#....#
#operations related to file and folder on the hdd
#creates a folder based on file name
简答:
- 如果您从异步协程中调用同步(阻塞)函数,则循环中并发 运行 的所有任务都将停止,直到该函数 returns.
- 在另一个线程或子进程中使用
loop.run_in_executor(...)
异步 运行 阻塞函数。
async def worker(queue):
loop = Asyncio.get_event_loop() # get a handle to the current run loop
while True:
queue_out = await queue.get()
file_name = queue_out.file.name
# run blocking function in an executor
create_path_task = loop.run_in_executor(None, create_path, file_name)
destination_path = await create_path_task # wait for this task to finish
await download_medical(queue_out, destination_path)
背景:
请注意,异步函数(协程)不会 运行 并行执行任务,它们 运行 并发执行,这可能会同时 运行 出现。考虑这一点的最简单方法是意识到每次 await
被调用时,即在等待结果时,事件循环将暂停当前 运行ning 协程和 运行 另一个协程,直到等待某事等等;因此使其协同并发。
通常在 IO 操作上进行等待,因为它们很耗时并且不 cpu 密集。 CPU 密集操作将阻塞循环直到它完成。另请注意,常规 IO 操作本质上是阻塞的,如果您想从并发中受益,那么您必须使用 Asyncio 兼容库,如 aiofile、aiohttp 等。
关于执行者的更多信息:
运行 定期同步功能而不阻塞事件循环的最简单方法是使用 loop.run_in_executor
。第一个参数从 concurrent.futures
模块中获取一个执行器,如 ThreadPoolExecutor
或 ProcessPoolExecutor
。通过传递 None
,Asyncio 将在默认 ThreadPoolExecutor
中自动 运行 您的函数。如果您的任务是 cpu 密集型任务,请使用 ProcessPoolExecutor
以便它可以真正并行地使用多个 cpu 核心和 运行。
如果一名或多名工人同时呼叫 'Synchronous function' 会发生什么情况? 也许一个或多个工人暂时被封锁?
async def worker(queue):
while True:
queue_out = await queue.get()
file_name = queue_out.file.name
# Create path + file_name
destination_path = create_path(file_name) #<-- SYNC function
await download_medical(queue_out,destination_path)
async def main():
queue_in = asyncio.Queue(1)
workers = [asyncio.create_task(worker(queue_in)) for _ in range(5)]
async for result in get_result(building):
await queue_in.put(result)
def create_path(file_name):
#....#
#operations related to file and folder on the hdd
#creates a folder based on file name
简答:
- 如果您从异步协程中调用同步(阻塞)函数,则循环中并发 运行 的所有任务都将停止,直到该函数 returns.
- 在另一个线程或子进程中使用
loop.run_in_executor(...)
异步 运行 阻塞函数。
async def worker(queue):
loop = Asyncio.get_event_loop() # get a handle to the current run loop
while True:
queue_out = await queue.get()
file_name = queue_out.file.name
# run blocking function in an executor
create_path_task = loop.run_in_executor(None, create_path, file_name)
destination_path = await create_path_task # wait for this task to finish
await download_medical(queue_out, destination_path)
背景:
请注意,异步函数(协程)不会 运行 并行执行任务,它们 运行 并发执行,这可能会同时 运行 出现。考虑这一点的最简单方法是意识到每次 await
被调用时,即在等待结果时,事件循环将暂停当前 运行ning 协程和 运行 另一个协程,直到等待某事等等;因此使其协同并发。
通常在 IO 操作上进行等待,因为它们很耗时并且不 cpu 密集。 CPU 密集操作将阻塞循环直到它完成。另请注意,常规 IO 操作本质上是阻塞的,如果您想从并发中受益,那么您必须使用 Asyncio 兼容库,如 aiofile、aiohttp 等。
关于执行者的更多信息:
运行 定期同步功能而不阻塞事件循环的最简单方法是使用 loop.run_in_executor
。第一个参数从 concurrent.futures
模块中获取一个执行器,如 ThreadPoolExecutor
或 ProcessPoolExecutor
。通过传递 None
,Asyncio 将在默认 ThreadPoolExecutor
中自动 运行 您的函数。如果您的任务是 cpu 密集型任务,请使用 ProcessPoolExecutor
以便它可以真正并行地使用多个 cpu 核心和 运行。