Python asyncio - 如果被调用的函数与 return 无关,如何使用

Python asyncio - how to use if function being called has nothing to return

我有 200 对差异路径。我写了一个小函数来区分每一对并更新一个字典,它本身就是函数的参数之一。假设 MY_DIFFER 是我在后台通过 subprocess 调用的一些差异工具。

async def do_diff(path1, path2, result):
    result[f"{path1} {path2}"] = MY_DIFFER(path1, path2)

如您所见,我对这个异步函数 return 没有任何了解。我只是在 result.

中捕获结果

我在其他地方使用 asyncio 并行调用此函数,如下所示:

path_tuples = [("/path11", "/path12"), ("/path21", "/path22"), ... ]
result = {}
loop = asyncio.get_event_loop()
loop.run_until_complete(
    asyncio.gather(
        *(do_diff(path1, path2, result) for path1, path2 in path_tuples)
    )
)

问题:

  1. 我不知道把await放在do_diff函数的什么地方。但是代码似乎也可以在没有它的情况下工作。
  2. 我不确定差异是否真的并行发生,因为当我在另一个终端查看 ps -eaf 的输出时,我一次只看到我正在调用的基础工具的一个实例.
  3. 执行速度与我按顺序执行差异时的速度相同

所以我显然做错了什么。我怎样才能真正并行地进行比较?

PS: 我在Python3.6

请记住,asyncio 不会 运行 并行 事情 ,它 运行 并行 事情 ,使用协作多任务模型——这意味着协程需要显式地让出时间给其他协程,以便它们 运行。这就是 await 命令的作用;它说“在我等待完成某些事情的同时去 运行 一些其他协程”。

如果您从未 awaiting 处理某事,则您不会获得并发执行。

你想要的是你的 do_diff 方法能够 await 执行你的外部工具,但你不能只用 subprocess 模块做到这一点.您 可以 使用 run_in_executor 方法来做到这一点,该方法在单独的线程或进程中安排 运行 同步命令(例如 subprocess.run),并且异步等待结果。这可能看起来像:

async def do_diff(path1, path2, result):
    loop = asyncio.get_event_loop()
    result[f"{path1} {path2}"] = await loop.run_in_executor(None, MY_DIFFER, path1, path2)

默认情况下,这将 运行 MY_DIFFER 在一个单独的线程中,尽管您可以使用一个单独的进程,而不是通过将显式执行程序作为第一个参数传递给 run_in_executor


根据我的评论,用 concurrent.futures 解决这个问题可能看起来像这样:

import concurrent.futures
import time


# dummy function that just sleeps for 2 seconds
# replace this with your actual code
def do_diff(path1, path2):
    print(f"diffing path {path1} and {path2}")
    time.sleep(2)
    return path1, path2, "information about diff"


# create 200 path tuples for demonstration purposes
path_tuples = [(f"/path{x}.1", f"/path{x}.2") for x in range(200)]
futures = []

with concurrent.futures.ProcessPoolExecutor(max_workers=100) as executor:
    for path1, path2 in path_tuples:
        # submit the job to the executor
        futures.append(executor.submit(do_diff, path1, path2))

# read the results
for future in futures:
    print(future.result())