Python concurrent.futures 使用子进程,运行 多个 python 脚本

Python concurrent.futures using subprocess, running several python script

我想 运行 多个 python 脚本同时使用 concurrent.futures。 我的代码的串行版本在文件夹中查找特定的 python 文件并执行它。

import re
import os
import glob
import re
from glob import glob
import concurrent.futures as cf

FileList = [];
import time
FileList = [];
start_dir = os.getcwd();
pattern   = "Read.py"

for dir,_,_ in os.walk(start_dir):
    FileList.extend(glob(os.path.join(dir,pattern))) ;

FileList

i=0
for file in FileList:
    dir=os.path.dirname((file))
    dirname1 = os.path.basename(dir) 
    print(dirname1)
    i=i+1
    Str='python '+ file
    print(Str)
    completed_process = subprocess.run(Str)`

对于我的代码的并行版本:

    def Python_callback(future):
    print(future.run_type, future.jid)
    return "One Folder finished executing"

def Python_execute():
    from concurrent.futures import ProcessPoolExecutor as Pool
    args = FileList
    pool = Pool(max_workers=1)
    future = pool.submit(subprocess.call, args, shell=1)
    future.run_type = "run_type"
    future.jid = FileList
    future.add_done_callback(Python_callback)
    print("Python executed")

if __name__ == '__main__':
    import subprocess
    Python_execute()

问题是我不确定如何将 FileList 的每个元素传递给分隔 cpu

提前感谢您的帮助

最小的变化是对每个元素使用一次 submit,而不是对整个列表使用一次:

futures = []
for file in FileList:
    future = pool.submit(subprocess.call, file, shell=1)
    future.blah blah
    futures.append(future)

futures 列表仅在您想对期货做某事时才是必需的——等待它们完成,检查它们的 return 值,等等。

与此同时,您正在使用 max_workers=1 显式创建池。毫不奇怪,这意味着您只会获得 1 个工作子进程,因此它最终会等待一个子进程完成,然后再获取下一个子进程。如果你想同时 运行 它们,删除那个 max_workers 并让它默认为每个核心一个(或者传递 max_workers=8 或其他一些不是 1 的数字,如果你有充分的理由覆盖默认值)。


虽然我们正在做这件事,但有很多方法可以简化您正在做的事情:

  • 这里真的需要multiprocessing吗?如果您需要与每个子进程进行通信,那么在单个线程中进行通信可能会很痛苦 — 但线程,或者 asyncio 可以像这里的进程一样工作。
  • 更重要的是,看起来您实际上不需要任何东西,只需启动进程并等待它完成,这可以用简单的同步代码完成。
  • 为什么要构建字符串并使用 shell=1 而不是仅传递列表而不使用 shell?使用 shell 会不必要地产生开销、安全问题和调试烦恼。
  • 您真的不需要每个 future 上的 jid — 它只是您所有调用字符串的列表,这没有用。可能更有用的是某种标识符,或者子进程 return 代码,或者……可能还有很多其他东西,但它们都是可以通过读取 return 值 subprocess.call 或一个简单的包装器。
  • 你也真的不需要回调。如果您只是将所有期货收集在一个列表中并as_completed它,您可以更简单地打印结果。
  • 如果您同时执行以上两项操作,那么您在循环中只剩下一个 pool.submit——这意味着您可以将整个循环替换为 pool.map
  • 您很少需要或不想混合使用 os.walkglob。当您实际拥有 glob 模式时,将 fnmatch 应用于 os.walk 中的 files 列表。但是在这里,您只是在每个目录中查找特定的文件名,所以实际上,您需要过滤的只是 file == 'Read.py'.
  • 您没有在循环中使用 i。但是如果你确实需要它,最好做 for i, file in enumerate(FileList): 而不是 for file in FileList: 并手动增加 i.