使用 ProcessPoolExecutor 的并行处理在不返回错误的情况下不起作用
Parallel processing with ProcessPoolExecutor doesn´t work without returning an error
目前我正在尝试加速我的模拟。我已经用线程尝试过它并且它有效。现在我想尝试使用并行流程来比较两种方式。我使用 futures.ProcessPoolExecutor。当我启动我的脚本时,模拟时间被打印出来(它非常低)但是我的程序没有正常工作。通常它应该生成几个文件,但它们没有生成。此外,没有错误消息。我已经在书籍和互联网上对此进行了一些研究,但我无法找出问题所在。
这是我的代码:
def main(setting):
cfg_path = generate(settings[setting])
run_simulation(cfg_path)
if __name__ == '__main__':
settings = get_wrapper_input_json("Szenarioanalyse.json")
typ = "processes"
start = time.perf_counter()
if typ == "threads":
with futures.ThreadPoolExecutor(cpu_count()-1) as e:
e.map(main,settings)
elif typ == "processes":
with futures.ProcessPoolExecutor(cpu_count()-1) as e:
e.map(main,settings)
else:
for setting in settings:
main(setting)
print("Simulationtime: "+str(time.perf_counter()-1))
我已经解决了问题:
settings = get_wrapper_input_json("Szenarioanalyse.json") #get the settings
parameters = {"Threads":True,"Processes":False,"Serial":False}
def simulate(setting):
cfg_path = generate(settings[setting])
run_simulation(cfg_path)
if __name__ == '__main__':
for key,value in parameters.items():
if key == "Threads" and value == True:
start_threads = time.perf_counter()
with futures.ThreadPoolExecutor(10) as e:
e.map(simulate,settings)
print("Simulationtime "+key+": "+str(time.perf_counter()-start_threads))
elif key == "Processes" and value == True:
start_processes = time.perf_counter()
pool = multiprocessing.Pool(multiprocessing.cpu_count())
pool.map(simulate,settings)
print("Simulationtime "+key+": "+str(time.perf_counter()-start_processes))
elif key == "Serial" and value == True:
start_serial = time.perf_counter()
for setting in settings:
simulate(setting)
print("Simulationtime "+key+": "+str(time.perf_counter()-start_serial))
#save_dataframe("Szenarioanalyse.json")
#file_management()
目前我正在尝试加速我的模拟。我已经用线程尝试过它并且它有效。现在我想尝试使用并行流程来比较两种方式。我使用 futures.ProcessPoolExecutor。当我启动我的脚本时,模拟时间被打印出来(它非常低)但是我的程序没有正常工作。通常它应该生成几个文件,但它们没有生成。此外,没有错误消息。我已经在书籍和互联网上对此进行了一些研究,但我无法找出问题所在。
这是我的代码:
def main(setting):
cfg_path = generate(settings[setting])
run_simulation(cfg_path)
if __name__ == '__main__':
settings = get_wrapper_input_json("Szenarioanalyse.json")
typ = "processes"
start = time.perf_counter()
if typ == "threads":
with futures.ThreadPoolExecutor(cpu_count()-1) as e:
e.map(main,settings)
elif typ == "processes":
with futures.ProcessPoolExecutor(cpu_count()-1) as e:
e.map(main,settings)
else:
for setting in settings:
main(setting)
print("Simulationtime: "+str(time.perf_counter()-1))
我已经解决了问题:
settings = get_wrapper_input_json("Szenarioanalyse.json") #get the settings
parameters = {"Threads":True,"Processes":False,"Serial":False}
def simulate(setting):
cfg_path = generate(settings[setting])
run_simulation(cfg_path)
if __name__ == '__main__':
for key,value in parameters.items():
if key == "Threads" and value == True:
start_threads = time.perf_counter()
with futures.ThreadPoolExecutor(10) as e:
e.map(simulate,settings)
print("Simulationtime "+key+": "+str(time.perf_counter()-start_threads))
elif key == "Processes" and value == True:
start_processes = time.perf_counter()
pool = multiprocessing.Pool(multiprocessing.cpu_count())
pool.map(simulate,settings)
print("Simulationtime "+key+": "+str(time.perf_counter()-start_processes))
elif key == "Serial" and value == True:
start_serial = time.perf_counter()
for setting in settings:
simulate(setting)
print("Simulationtime "+key+": "+str(time.perf_counter()-start_serial))
#save_dataframe("Szenarioanalyse.json")
#file_management()