运行 多个外部程序

Running multiple external programs

因此,我设置了一个执行外部程序的简短脚本(用 Fortran 77 编写)。我想要 运行 程序的多个实例,因为我的计算机上有 8 个内核,所以我找到的最简单的解决方案是:

import subprocess


import os


i = n

while(i<n):
  dire = "dir/Run"+str(i)+"/"
  os.chdir(dire)
  p1 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+1)+"/"
  os.chdir(dire)
  p2 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+2)+"/"
  os.chdir(dire)
  p3 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+3)+"/"
  os.chdir(dire)
  p4 = subprocess.Popen(['./mej'])  
  dire = "dir/Run"+str(i+4)+"/"
  os.chdir(dire)
  p5 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+5)+"/"
  os.chdir(dire)
  p6 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+6)+"/"
  os.chdir(dire)
  p7 = subprocess.Popen(['./mej'])
  dire = "dir/Run"+str(i+7)+"/"
  os.chdir(dire)
  p8 = subprocess.Popen(['./mej'])
  dire = "/Run"+str(i+8)+"/"
  os.chdir(dire)
  p3 = subprocess.Popen(['./mej'])
  exit_codes = [p.wait() for p in p1, p2, p3, p4, p5, p6, p7, p8]
  i = i + 8



print "Job's done!"

现在一开始效果很好,但是我只是更改为可变时间步长,这样做时每个积分 运行s 的时间变化很大。现在的问题是脚本将等待最慢的脚本完成,然后再启动一组新的集成。我怎么写才能让我总是有 8 个实例 运行ning?

虽然给定 运行 的执行时间可能有很大差异,但通常可以安全地假设例如10 个连续 运行 的方差会小得多。

所以简单的解决方案A就是启动8个进程,每个进程调用10次外部程序,然后等待这些进程结束。您仍然需要等待最慢的进程,但开销会小得多。

当然有一个明显的解决方案 B:创建一个待处理的 运行 池,其中 8 个进程从池中挑选一个新的 运行一旦他们完成当前 运行。这将真正减少开销,但您必须在此处处理同步原语。

这是这 3 种方法(您使用的一种和我正在谈论的两种)的一个小例子:

红色小方块表示需要改进的地方。基本上,方法 A 避免停止每个线程,而是在每个 运行 之后停止一个线程。方法 B 走得更远,使一个线程已完成其所有 运行s 以从另一个线程获取一个。

您可以使用线程池,让所有 CPU 保持忙碌:

#!/usr/bin/env python
import os
import subprocess
from multiprocessing.pool import ThreadPool

def run(i):
    working_dir = "dir/Run/" + str(i + 1)
    return i, subprocess.call(os.path.join(working_dir, 'mej'), cwd=working_dir)

results = ThreadPool().map(run, range(n))

一旦一个 mej 进程完成,下一个进程就会启动。一次不超过 os.cpu_count() 个并发工作进程 运行。

你可以写一些类似的东西。定义 运行 的总数和可用核心的数量,以及检查是否完成的延迟。对于延迟,只需输入合理的秒数即可。如果一个进程 运行 平均需要 10 分钟,则延迟 60 秒或更少就足够了。

import subprocess
import time
import os

def runIt(rootDir, prog, i):
    dire = "dir/Run/" + str(i + 1)
    os.chdir(dire)
    return subprocess.Popen(['./mej'])

n=16    #total number of runs
nProc = 8 # number of cores
i = 0
delay = 2 #delays in  second to check if one has returned

pList = [runIt(p) for p in range(min(nProc, n))]
i = len(pList)
while(i<n):
    time.sleep(delay) # delays for delay seconds
    for j in range(len(pList)):
        pList[j].poll()
        if pList[j].returncode is not None and i<n:
            pList[j] = runIt(i)
            i = i+1
print "Job's done!"