运行 若干组 QProcess' 彼此相继

Run a number of groups of QProcess' after each other

我需要调用外部程序 N 次。我想并行执行此操作。所以到目前为止,我的策略是启动 N QProcesses 并记录已启动和已完成的进程数。 (这样我就可以知道他们什么时候都完成了)。

但是,外部程序占用了相当多的 RAM,所以我不希望同时有 4 个并行进程。

什么是好的策略?

我认为 signals/slots 不足以实现此目的(我想不出一种不高度复杂的方法)...也许我可以用队列做点什么?

如何确保在任何时候我只有 4 个进程 运行? 我如何才能确定所有 N 进程何时最终完成?

(首选 pyside/pyqt 的答案,但 C++ 也可以)

概念验证:

h 文件

#ifndef CPROCESSRUNNER_H
#define CPROCESSRUNNER_H

#include <QObject>
#include <QQueue>

class QProcess;

class CProcessRunner : public QObject
{
    Q_OBJECT
    Q_PROPERTY(int processCount READ processCount WRITE setProcessCount)
public:
    explicit CProcessRunner(QObject *parent = 0);
    ~CProcessRunner();

    void addProcess(const QString& program);

    int processCount() const;

public slots:
    void setProcessCount(int arg);

private slots:
    void startFreeProcesses();

private:
    int getActiveProcessCount() const;

    QQueue<QProcess*> m_processes;
    int m_processCount;
};

#endif // CPROCESSRUNNER_H

cpp 文件

#include "CProcessRunner.h"
#include <QProcess>
CProcessRunner::CProcessRunner(QObject *parent)
    : QObject(parent)
    , m_processCount(0)
{

}

CProcessRunner::~CProcessRunner()
{

}

void CProcessRunner::addProcess(const QString& program)
{
    QProcess* pProcess = new QProcess(this);
    pProcess->setObjectName(program);
    m_processes.enqueue(pProcess);
    startFreeProcesses();
}

int CProcessRunner::processCount() const
{
    return m_processCount;
}

void CProcessRunner::setProcessCount(int arg)
{
    m_processCount = arg;
}

void CProcessRunner::startFreeProcesses()
{
    while (!m_processes.isEmpty() && (getActiveProcessCount() < m_processCount)) {
        QProcess* pProcess = m_processes.dequeue();
        connect(pProcess, SIGNAL(finished(int)), this, SLOT(startFreeProcesses()));
        connect(pProcess, SIGNAL(finished(int)), pProcess, SLOT(deleteLater()));

        pProcess->start(pProcess->objectName());
        pProcess->waitForStarted(-1);
    }
}

int CProcessRunner::getActiveProcessCount() const
{
    int result = 0;
    foreach (QProcess* pProcess, findChildren<QProcess*>()) {
        if (pProcess->state() == QProcess::Running) {
            result++;
        }
    }
    return result;
}

作为参考,这是我目前使用 Pyside 的实现。 我希望能够捕获每个进程的输出并将其传递(例如显示在 QTextEdit 或类似文件中)。为此,我为每个进程分配了一个编号,并将其 stdout 和 stderr 连接到插槽,然后插槽将发出消息和进程位置。 我不相信我目前对该功能的实现,但这里是:

class ProcessRunner(QtCore.QObject):  
'''
Runs N processes in groups of M
'''    

singleProcessFinished = QtCore.Signal()
error = QtCore.Signal(tuple) #(process number, message)
message = QtCore.Signal(tuple) #(process number, message)

def __init__(self, maxProcesses=4, parent=None):
  super(ProcessRunner, self).__init__(parent)
  self.processQueue = Queue.Queue()  
  self.maxProcesses = maxProcesses
  self.activeProcessCount = 0

  self._processNumber = 0

def addProcess(self, program, args):
  '''
  Add a process to the process queue

  Args:

    program (str): String of program path and arguments, separated by one or more spaces    
  '''
  self._processNumber += 1
  if self._processNumber == self.maxProcesses:
    self._processNumber = 0


  proc = QtCore.QProcess(self)
  proc.readyReadStandardError.connect(lambda pos=self._processNumber, process=proc: self.emit_error(pos, proc))
  proc.readyReadStandardOutput.connect(lambda pos=self._processNumber, process=proc: self.emit_output(pos, proc))
  self.processQueue.put((proc, program, args))
  self.startFreeProcesses()

def startFreeProcesses(self):
  '''
  Starts all waiting processes up to a maximum of self.maxProcesses
  '''
  while (not self.processQueue.empty()) and (self.activeProcessCount < self.maxProcesses):
    proc, program, args = self.processQueue.get()
    proc.finished.connect(self.startFreeProcesses)
    proc.finished.connect(proc.deleteLater)
    proc.finished.connect(self.singleProcessFinished.emit)
    proc.finished.connect(self._decreaseActiveProcessCount)

    proc.start(program, args)
    proc.waitForStarted(-1)
    self._increaseActiveProcessCount()

def _decreaseActiveProcessCount(self):
  self.activeProcessCount -= 1

def _increaseActiveProcessCount(self):
  self.activeProcessCount += 1

def emit_error(self, pos, proc):    
  self.error.emit(pos, str(proc.readAllStandardError()))

def emit_error(self, pos, proc):    
  self.error.emit(pos, str(proc.readAllStandardOutput()))