使用 Concurrent.Futures.ProcessPoolExecutor 到 运行 同步和独立 ABAQUS 模型
Using Concurrent.Futures.ProcessPoolExecutor to run simultaneous & independents ABAQUS models
我希望 运行 总共 nAnalysis=25 Abaqus 模型,每个模型使用 X 个核心,并且我可以 运行 同时 nParallelLoops=5 这些模型。如果当前 5 个分析中的一个完成,则应开始另一个分析,直到所有 nAnalysis 完成。
我根据 1 和 2 中发布的解决方案实现了以下代码。但是,我遗漏了一些东西,因为所有 nAnalysis 都试图从 "once" 开始,代码死锁并且没有分析完成,因为许多那么可能想要使用与已经开始的分析所使用的相同的核心。
- Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs
def runABQfile(*args):
import subprocess
import os
inpFile,path,jobVars = args
prcStr1 = (path+'/runJob.sh')
process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path)
def safeABQrun(*args):
import os
try:
runABQfile(*args)
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
到目前为止,我能够 运行 的唯一方法是,如果我修改 errFunction
以在当时正好使用 5 个分析,如下所示。但是,这种方法有时会导致其中一个分析比每个组中的其他 4 个分析花费的时间长得多(每个 ProcessPoolExecutor
调用),因此尽管资源(核心)可用,但下一组 5 个分析不会开始。最终这会导致有更多时间来完成所有 25 个模型。
def errFunction(ppos, *args):
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
# Group 1
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,5)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 2
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(5,10)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 3
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(10,15)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 4
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(15,20)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 5
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(20,25)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
我尝试使用 as_completed
函数,但它似乎也不起作用。
请你帮忙找出正确的并行化方法,这样我就可以运行一个nAnalysis,总是nParallelLoops 运行并发?
感谢您的帮助。
我正在使用 Python 2.7
最佳,
大卫·P.
2016 年 7 月 30 日更新:
我在 safeABQrun
中引入了一个循环,它管理了 5 个不同的 "queues"。循环是必要的,以避免分析试图在一个节点中 运行 而另一个节点仍在 运行ning 的情况。在开始任何实际分析之前,分析已预先配置为 运行 在请求的节点之一中。
def safeABQrun(*list_args):
import os
inpFiles,paths,jobVars = list_args
nA = len(inpFiles)
for k in range(0,nA):
args = (inpFiles[k],paths[k],jobVars[k])
try:
runABQfile(*args) # Actual Run Function
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes
for f in as_completed(futures):
print("|=== Finish Process Train %d ===|" % futures[f])
if f.exception() is not None:
print('%r generated an exception: %s' % (futures[f], f.exception()))
我觉得没问题,但我无法 运行 按原样处理您的代码。试试简单得多的东西,然后 添加 直到 "a problem" 出现?例如,以下是否显示了您想要的行为类型?它在我的机器上运行,但我是 运行ning Python 3.5.2。你说你是 运行ning 2.7,但是 concurrent.futures
在 Python 2 中不存在 - 所以如果你使用的是 2.7,你一定是 运行ning 某人的 backport图书馆,也许问题就在于此。尝试以下应该有助于回答是否是这种情况:
from concurrent.futures import ProcessPoolExecutor, wait, as_completed
def worker(i):
from time import sleep
from random import randrange
s = randrange(1, 10)
print("%d started and sleeping for %d" % (i, s))
sleep(s)
if __name__ == "__main__":
nAnalysis = 25
nParallelLoops = 5
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
futures = dict((executor.submit(worker, k), k) for k in range(nAnalysis))
for f in as_completed(futures):
print("got %d" % futures[f])
典型输出:
0 started and sleeping for 4
1 started and sleeping for 1
2 started and sleeping for 1
3 started and sleeping for 6
4 started and sleeping for 5
5 started and sleeping for 9
got 1
6 started and sleeping for 5
got 2
7 started and sleeping for 6
got 0
8 started and sleeping for 6
got 4
9 started and sleeping for 8
got 6
10 started and sleeping for 9
got 3
11 started and sleeping for 6
got 7
12 started and sleeping for 9
got 5
...
我在 safeABQrun
中引入了一个循环,它管理了 5 个不同的 "queues"。循环是必要的,以避免分析试图在一个节点中 运行 而另一个节点仍在 运行ning 的情况。在开始任何实际分析之前,在请求的节点之一中将分析预配置为 运行。
def safeABQrun(*list_args):
import os
inpFiles,paths,jobVars = list_args
nA = len(inpFiles)
for k in range(0,nA):
args = (inpFiles[k],paths[k],jobVars[k])
try:
runABQfile(*args) # Actual Run Function
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes
for f in as_completed(futures):
print("|=== Finish Process Train %d ===|" % futures[f])
if f.exception() is not None:
print('%r generated an exception: %s' % (futures[f], f.exception()))
我希望 运行 总共 nAnalysis=25 Abaqus 模型,每个模型使用 X 个核心,并且我可以 运行 同时 nParallelLoops=5 这些模型。如果当前 5 个分析中的一个完成,则应开始另一个分析,直到所有 nAnalysis 完成。
我根据 1 和 2 中发布的解决方案实现了以下代码。但是,我遗漏了一些东西,因为所有 nAnalysis 都试图从 "once" 开始,代码死锁并且没有分析完成,因为许多那么可能想要使用与已经开始的分析所使用的相同的核心。
- Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs
def runABQfile(*args):
import subprocess
import os
inpFile,path,jobVars = args
prcStr1 = (path+'/runJob.sh')
process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path)
def safeABQrun(*args):
import os
try:
runABQfile(*args)
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
到目前为止,我能够 运行 的唯一方法是,如果我修改 errFunction
以在当时正好使用 5 个分析,如下所示。但是,这种方法有时会导致其中一个分析比每个组中的其他 4 个分析花费的时间长得多(每个 ProcessPoolExecutor
调用),因此尽管资源(核心)可用,但下一组 5 个分析不会开始。最终这会导致有更多时间来完成所有 25 个模型。
def errFunction(ppos, *args):
import os
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import wait
# Group 1
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,5)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 2
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(5,10)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 3
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(10,15)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 4
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(15,20)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
# Group 5
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(20,25)) # 5Nodes
wait(future_to_file,timeout=None,return_when='ALL_COMPLETED')
我尝试使用 as_completed
函数,但它似乎也不起作用。
请你帮忙找出正确的并行化方法,这样我就可以运行一个nAnalysis,总是nParallelLoops 运行并发? 感谢您的帮助。 我正在使用 Python 2.7
最佳, 大卫·P.
2016 年 7 月 30 日更新:
我在 safeABQrun
中引入了一个循环,它管理了 5 个不同的 "queues"。循环是必要的,以避免分析试图在一个节点中 运行 而另一个节点仍在 运行ning 的情况。在开始任何实际分析之前,分析已预先配置为 运行 在请求的节点之一中。
def safeABQrun(*list_args):
import os
inpFiles,paths,jobVars = list_args
nA = len(inpFiles)
for k in range(0,nA):
args = (inpFiles[k],paths[k],jobVars[k])
try:
runABQfile(*args) # Actual Run Function
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes
for f in as_completed(futures):
print("|=== Finish Process Train %d ===|" % futures[f])
if f.exception() is not None:
print('%r generated an exception: %s' % (futures[f], f.exception()))
我觉得没问题,但我无法 运行 按原样处理您的代码。试试简单得多的东西,然后 添加 直到 "a problem" 出现?例如,以下是否显示了您想要的行为类型?它在我的机器上运行,但我是 运行ning Python 3.5.2。你说你是 运行ning 2.7,但是 concurrent.futures
在 Python 2 中不存在 - 所以如果你使用的是 2.7,你一定是 运行ning 某人的 backport图书馆,也许问题就在于此。尝试以下应该有助于回答是否是这种情况:
from concurrent.futures import ProcessPoolExecutor, wait, as_completed
def worker(i):
from time import sleep
from random import randrange
s = randrange(1, 10)
print("%d started and sleeping for %d" % (i, s))
sleep(s)
if __name__ == "__main__":
nAnalysis = 25
nParallelLoops = 5
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
futures = dict((executor.submit(worker, k), k) for k in range(nAnalysis))
for f in as_completed(futures):
print("got %d" % futures[f])
典型输出:
0 started and sleeping for 4
1 started and sleeping for 1
2 started and sleeping for 1
3 started and sleeping for 6
4 started and sleeping for 5
5 started and sleeping for 9
got 1
6 started and sleeping for 5
got 2
7 started and sleeping for 6
got 0
8 started and sleeping for 6
got 4
9 started and sleeping for 8
got 6
10 started and sleeping for 9
got 3
11 started and sleeping for 6
got 7
12 started and sleeping for 9
got 5
...
我在 safeABQrun
中引入了一个循环,它管理了 5 个不同的 "queues"。循环是必要的,以避免分析试图在一个节点中 运行 而另一个节点仍在 运行ning 的情况。在开始任何实际分析之前,在请求的节点之一中将分析预配置为 运行。
def safeABQrun(*list_args):
import os
inpFiles,paths,jobVars = list_args
nA = len(inpFiles)
for k in range(0,nA):
args = (inpFiles[k],paths[k],jobVars[k])
try:
runABQfile(*args) # Actual Run Function
except Exception as e:
print("Tread Error: %s runABQfile(*%r)" % (e, args))
def errFunction(ppos, *args):
with ProcessPoolExecutor(max_workers=nParallelLoops) as executor:
futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes
for f in as_completed(futures):
print("|=== Finish Process Train %d ===|" % futures[f])
if f.exception() is not None:
print('%r generated an exception: %s' % (futures[f], f.exception()))