Python:存储ProcessPoolExecutor的结果

Python: store results of ProcessPoolExecutor

我对使用“concurrent.futures”进行并行处理还很陌生。代码似乎有效,但我不确定如何存储每个进程的结果,如果任何进程的 return 值不为零,它们最终会将构建标记为失败。 试图创建一个列表 (exit_status) 并将结果附加到该列表,但显示 IndexError。想知道我该怎么做?

#!/usr/bin/env python3

import concurrent.futures
import sys
import shutil
import os
import glob
import multiprocessing as mp
import json
from os import path


def slave(path1, path2, target):
    os.makedirs(target)
    shutil.copy(path1, target)
    shutil.copy(path2, target)
    os.system(<Login command>)
    os.system(<Image creation command>)
    os.system(<Copy to Other slaves or NFS>)
    
    #If any one of the above operation or command fails for any of the process, the script should return 1 at the end of the execution or fail the build at last.


def main():
    processed = {}
    exit_status = []
    with open('example.json', 'r') as f:
        data = json.load(f)
        for value in data.items():
            for line in value[1]:
                if line.endswith('.zip'):
                    targz = line
                elif line.endswith('.yaml'):
                    yaml = line
            processed[targz] = yaml


    with concurrent.futures.ProcessPoolExecutor() as executor:
        for id, (path2, path1) in enumerate(processed.items(), 1):
            target = path.join("/tmp", "dir" + str(id))
            ret = executor.submit(slave, path1, path2, target)
            exit_status.append(ret.result())

    for i in exit_status:
    print("##########Result status: ", i)
    
if __name__ == "__main__":
    mp.set_start_method('spawn')
    main()

exit_status 列表的输出:

 ##########Result status:  None
##########Result status:  None

重新;评论

如果你想获得系统调用的结果以便对其结果进行操作,使用 subprocess.runos.system 更灵活和强大。此外,如果您真的想并行执行操作,则不能在每个任务完成后等待 result()。否则你一次只能做一件事。最好提交所有任务,并收集 Future 个对象。然后你可以迭代这些并等待每个 result() 既然你已经提交了你想要 executor 做的所有工作。

def target_func(path1, path2, target):
    #...
    #instead of os.system, use subprocess.run
    
    #you can inspect the stdout from the process
    complete_process = subprocess.run(<Login command>, text=True, capture_output=True)
    if "success" not in complete_process.stdout:
        return "uh-oh"
    #you can also just check the return value (0 typically means clean exit)
    if subprocess.run(<Image creation command>).returncode == 0:
        return "uh-oh"
    #or you can tell `run` to generate an error if the returncode is non-zero
    try:
        subprocess.run(<Copy to Other slaves or NFS>, check=True)
    except subprocess.CalledProcessError:
        return "uh-oh"
    return "we did it!"

def main():
    #...
    #...
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for id, (path2, path1) in enumerate(processed.items(), 1):
            target = path.join("/tmp", "dir" + str(id))
            ret = executor.submit(slave, path1, path2, target)
            exit_status.append(ret)

    for i in exit_status:
        print("##########Result status: ", i.result())