如何在异步的不同线程中 运行 `loop_in_executor`?

How to run `loop_in_executor` in different threads for asyncio?

所以,假设我们有一个同步方法,如下所示:

def sync_method(param1, param2):
    # Complex method logic
    return "Completed"

我想 运行 在当前事件循环中 run_in_executor 下的不同异步方法中使用上述方法。举例如下:

async def run_sync_in_executor(param1, param2, pool=None):
    loop = asyncio.get_event_loop()
    value = loop.run_in_executor(pool, sync_method, param1, param2)
    # Some further changes to the variable `value`
    return value

现在,我想 运行 在遍历参数列表的同时使用上述方法,并最终修改最终输出。 一种我认为可行但行不通的方法是使用 asyncio.gather:

def main():
    params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
    output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])

当我阅读文档并理解时,这不起作用的原因是方法 run_sync_in_executor 正在尝试访问当前事件循环,该循环由 [ 的所有不同执行共享=19=]。因为,每个事件循环只能有一个线程,甚至在第一个循环结束之前,由于 gather 的性质,以下方法试图访问事件循环,这会导致错误。

作为解决方案,我想到了使用 ThreadPoolExecutor,它可能会根据 num_workers 子句创建线程数,其中每个方法在执行时都可以使用 pool .我期待这样的事情:

with ThreadPoolExecutor(num_workers=8) as executor:
    for param in params_list:
        future = executor.submit(run_sync_in_executor, param[0], param[1], executor)
        print(future.result())

但是上面的方法不行。 如果有人可以建议我实现预期目标的最佳方法是什么,那就太好了?

你的代码有几个错误:你没有等待 run_in_executormain 应该是异步函数。工作解决方案:

import asyncio
import time


def sync_method(param1, param2):
    """Some sync function"""
    time.sleep(5)
    return param1 + param2 + 10000


async def ticker():
    """Just to show that sync method does not block async loop"""
    while True:
        await asyncio.sleep(1)
        print("Working...")


async def run_sync_in_executor(param1, param2, pool=None):
    """Wrapper around run in executor"""
    loop = asyncio.get_event_loop()
    # run_in_executor should be awaited, otherwise run_in_executor
    # just returns coroutine (not its result!)
    value = await loop.run_in_executor(pool, sync_method, param1, param2)
    return value


async def amain():
    """Main should be async function !"""
    params_list = [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8], [8, 9], [9, 10]]
    asyncio.create_task(ticker()) # runs in parallel, never awaited!
    output = await asyncio.gather(*[run_sync_in_executor(v[0], v[1]) for v in params_list])
    print(output)

if __name__ == '__main__':
    asyncio.run(amain())