如何在异步的不同线程中 运行 `loop_in_executor`?
How to run `loop_in_executor` in different threads for asyncio?
所以,假设我们有一个同步方法,如下所示:
def sync_method(param1, param2):
# Complex method logic
return "Completed"
我想 运行 在当前事件循环中 run_in_executor
下的不同异步方法中使用上述方法。举例如下:
async def run_sync_in_executor(param1, param2, pool=None):
loop = asyncio.get_event_loop()
value = loop.run_in_executor(pool, sync_method, param1, param2)
# Some further changes to the variable `value`
return value
现在,我想 运行 在遍历参数列表的同时使用上述方法,并最终修改最终输出。
一种我认为可行但行不通的方法是使用 asyncio.gather
:
def main():
params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])
当我阅读文档并理解时,这不起作用的原因是方法 run_sync_in_executor
正在尝试访问当前事件循环,该循环由 [ 的所有不同执行共享=19=]。因为,每个事件循环只能有一个线程,甚至在第一个循环结束之前,由于 gather
的性质,以下方法试图访问事件循环,这会导致错误。
作为解决方案,我想到了使用 ThreadPoolExecutor
,它可能会根据 num_workers
子句创建线程数,其中每个方法在执行时都可以使用 pool
.我期待这样的事情:
with ThreadPoolExecutor(num_workers=8) as executor:
for param in params_list:
future = executor.submit(run_sync_in_executor, param[0], param[1], executor)
print(future.result())
但是上面的方法不行。
如果有人可以建议我实现预期目标的最佳方法是什么,那就太好了?
你的代码有几个错误:你没有等待 run_in_executor
,main
应该是异步函数。工作解决方案:
import asyncio
import time
def sync_method(param1, param2):
"""Some sync function"""
time.sleep(5)
return param1 + param2 + 10000
async def ticker():
"""Just to show that sync method does not block async loop"""
while True:
await asyncio.sleep(1)
print("Working...")
async def run_sync_in_executor(param1, param2, pool=None):
"""Wrapper around run in executor"""
loop = asyncio.get_event_loop()
# run_in_executor should be awaited, otherwise run_in_executor
# just returns coroutine (not its result!)
value = await loop.run_in_executor(pool, sync_method, param1, param2)
return value
async def amain():
"""Main should be async function !"""
params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
asyncio.create_task(ticker()) # runs in parallel, never awaited!
output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])
print(output)
if __name__ == '__main__':
asyncio.run(amain())
所以,假设我们有一个同步方法,如下所示:
def sync_method(param1, param2):
# Complex method logic
return "Completed"
我想 运行 在当前事件循环中 run_in_executor
下的不同异步方法中使用上述方法。举例如下:
async def run_sync_in_executor(param1, param2, pool=None):
loop = asyncio.get_event_loop()
value = loop.run_in_executor(pool, sync_method, param1, param2)
# Some further changes to the variable `value`
return value
现在,我想 运行 在遍历参数列表的同时使用上述方法,并最终修改最终输出。
一种我认为可行但行不通的方法是使用 asyncio.gather
:
def main():
params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])
当我阅读文档并理解时,这不起作用的原因是方法 run_sync_in_executor
正在尝试访问当前事件循环,该循环由 [ 的所有不同执行共享=19=]。因为,每个事件循环只能有一个线程,甚至在第一个循环结束之前,由于 gather
的性质,以下方法试图访问事件循环,这会导致错误。
作为解决方案,我想到了使用 ThreadPoolExecutor
,它可能会根据 num_workers
子句创建线程数,其中每个方法在执行时都可以使用 pool
.我期待这样的事情:
with ThreadPoolExecutor(num_workers=8) as executor:
for param in params_list:
future = executor.submit(run_sync_in_executor, param[0], param[1], executor)
print(future.result())
但是上面的方法不行。 如果有人可以建议我实现预期目标的最佳方法是什么,那就太好了?
你的代码有几个错误:你没有等待 run_in_executor
,main
应该是异步函数。工作解决方案:
import asyncio
import time
def sync_method(param1, param2):
"""Some sync function"""
time.sleep(5)
return param1 + param2 + 10000
async def ticker():
"""Just to show that sync method does not block async loop"""
while True:
await asyncio.sleep(1)
print("Working...")
async def run_sync_in_executor(param1, param2, pool=None):
"""Wrapper around run in executor"""
loop = asyncio.get_event_loop()
# run_in_executor should be awaited, otherwise run_in_executor
# just returns coroutine (not its result!)
value = await loop.run_in_executor(pool, sync_method, param1, param2)
return value
async def amain():
"""Main should be async function !"""
params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
asyncio.create_task(ticker()) # runs in parallel, never awaited!
output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])
print(output)
if __name__ == '__main__':
asyncio.run(amain())