Python:存储ProcessPoolExecutor的结果
Python: store results of ProcessPoolExecutor
我对使用“concurrent.futures”进行并行处理还很陌生。代码似乎有效,但我不确定如何存储每个进程的结果,如果任何进程的 return 值不为零,它们最终会将构建标记为失败。
试图创建一个列表 (exit_status
) 并将结果附加到该列表,但显示 IndexError
。想知道我该怎么做?
#!/usr/bin/env python3
import concurrent.futures
import sys
import shutil
import os
import glob
import multiprocessing as mp
import json
from os import path
def slave(path1, path2, target):
os.makedirs(target)
shutil.copy(path1, target)
shutil.copy(path2, target)
os.system(<Login command>)
os.system(<Image creation command>)
os.system(<Copy to Other slaves or NFS>)
#If any one of the above operation or command fails for any of the process, the script should return 1 at the end of the execution or fail the build at last.
def main():
processed = {}
exit_status = []
with open('example.json', 'r') as f:
data = json.load(f)
for value in data.items():
for line in value[1]:
if line.endswith('.zip'):
targz = line
elif line.endswith('.yaml'):
yaml = line
processed[targz] = yaml
with concurrent.futures.ProcessPoolExecutor() as executor:
for id, (path2, path1) in enumerate(processed.items(), 1):
target = path.join("/tmp", "dir" + str(id))
ret = executor.submit(slave, path1, path2, target)
exit_status.append(ret.result())
for i in exit_status:
print("##########Result status: ", i)
if __name__ == "__main__":
mp.set_start_method('spawn')
main()
exit_status
列表的输出:
##########Result status: None
##########Result status: None
重新;评论
如果你想获得系统调用的结果以便对其结果进行操作,使用 subprocess.run
比 os.system
更灵活和强大。此外,如果您真的想并行执行操作,则不能在每个任务完成后等待 result()
。否则你一次只能做一件事。最好提交所有任务,并收集 Future
个对象。然后你可以迭代这些并等待每个 result()
既然你已经提交了你想要 executor
做的所有工作。
def target_func(path1, path2, target):
#...
#instead of os.system, use subprocess.run
#you can inspect the stdout from the process
complete_process = subprocess.run(<Login command>, text=True, capture_output=True)
if "success" not in complete_process.stdout:
return "uh-oh"
#you can also just check the return value (0 typically means clean exit)
if subprocess.run(<Image creation command>).returncode == 0:
return "uh-oh"
#or you can tell `run` to generate an error if the returncode is non-zero
try:
subprocess.run(<Copy to Other slaves or NFS>, check=True)
except subprocess.CalledProcessError:
return "uh-oh"
return "we did it!"
def main():
#...
#...
with concurrent.futures.ProcessPoolExecutor() as executor:
for id, (path2, path1) in enumerate(processed.items(), 1):
target = path.join("/tmp", "dir" + str(id))
ret = executor.submit(slave, path1, path2, target)
exit_status.append(ret)
for i in exit_status:
print("##########Result status: ", i.result())
我对使用“concurrent.futures”进行并行处理还很陌生。代码似乎有效,但我不确定如何存储每个进程的结果,如果任何进程的 return 值不为零,它们最终会将构建标记为失败。
试图创建一个列表 (exit_status
) 并将结果附加到该列表,但显示 IndexError
。想知道我该怎么做?
#!/usr/bin/env python3
import concurrent.futures
import sys
import shutil
import os
import glob
import multiprocessing as mp
import json
from os import path
def slave(path1, path2, target):
os.makedirs(target)
shutil.copy(path1, target)
shutil.copy(path2, target)
os.system(<Login command>)
os.system(<Image creation command>)
os.system(<Copy to Other slaves or NFS>)
#If any one of the above operation or command fails for any of the process, the script should return 1 at the end of the execution or fail the build at last.
def main():
processed = {}
exit_status = []
with open('example.json', 'r') as f:
data = json.load(f)
for value in data.items():
for line in value[1]:
if line.endswith('.zip'):
targz = line
elif line.endswith('.yaml'):
yaml = line
processed[targz] = yaml
with concurrent.futures.ProcessPoolExecutor() as executor:
for id, (path2, path1) in enumerate(processed.items(), 1):
target = path.join("/tmp", "dir" + str(id))
ret = executor.submit(slave, path1, path2, target)
exit_status.append(ret.result())
for i in exit_status:
print("##########Result status: ", i)
if __name__ == "__main__":
mp.set_start_method('spawn')
main()
exit_status
列表的输出:
##########Result status: None
##########Result status: None
重新;评论
如果你想获得系统调用的结果以便对其结果进行操作,使用 subprocess.run
比 os.system
更灵活和强大。此外,如果您真的想并行执行操作,则不能在每个任务完成后等待 result()
。否则你一次只能做一件事。最好提交所有任务,并收集 Future
个对象。然后你可以迭代这些并等待每个 result()
既然你已经提交了你想要 executor
做的所有工作。
def target_func(path1, path2, target):
#...
#instead of os.system, use subprocess.run
#you can inspect the stdout from the process
complete_process = subprocess.run(<Login command>, text=True, capture_output=True)
if "success" not in complete_process.stdout:
return "uh-oh"
#you can also just check the return value (0 typically means clean exit)
if subprocess.run(<Image creation command>).returncode == 0:
return "uh-oh"
#or you can tell `run` to generate an error if the returncode is non-zero
try:
subprocess.run(<Copy to Other slaves or NFS>, check=True)
except subprocess.CalledProcessError:
return "uh-oh"
return "we did it!"
def main():
#...
#...
with concurrent.futures.ProcessPoolExecutor() as executor:
for id, (path2, path1) in enumerate(processed.items(), 1):
target = path.join("/tmp", "dir" + str(id))
ret = executor.submit(slave, path1, path2, target)
exit_status.append(ret)
for i in exit_status:
print("##########Result status: ", i.result())