如何访问包含在 Qthread 中的多处理工作线程内部的变量?

How do I access to a variable inside a multiprocessing worker itself contained in a Qthread?

我正在尝试访问 QThread 中的多处理工作线程中的变量。

我举了一个最小的例子来强调我的观点:

from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *

import sys
import numpy as np
import multiprocessing

class my_Thread(QThread):
    finished = pyqtSignal()

    def __init__(self,M=100, N=100, nbCore=2):
        QThread.__init__(self, )
        self.M = M
        self.N = N
        self.nbCore = nbCore

    def run(self):
        self.my_worker = mp_worker_class()
        self.Mean = self.my_worker.start(self.nbCore, self.M, self.N)
        self.finished.emit()

    def returninfo(self):
        return self.my_worker.nbiter

class mp_worker_class():
    nbiter = 0
    def __init__(self,):
        pass

    @classmethod
    def start(self, nbCore=2, M=100, N=100 ):
        self.nbiter = 0
        X = np.random.rand(nbCore,M,N)
        pipe_list = []
        for i in range(nbCore):
            recv_end, send_end = multiprocessing.Pipe()
            p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end))
            p.start()
            pipe_list.append(recv_end)

        for idx, recv_end in enumerate(pipe_list):
            Ymean =recv_end.recv()
            print(Ymean)

    @classmethod
    def mp_worker(self, X=None, send_end=None):
        mean = 0
        nb =0
        for i in range(X.shape[0]):
            for j in range(X.shape[1]):
                # print(self.nbiter)
                mean += X[i,j]
                nb += 1
                self.nbiter += 1
        mean /= nb
        send_end.send([mean])


class GUI(QMainWindow):
    def __init__(self, parent=None):
        super(GUI, self).__init__()
        self.parent = parent

        self.centralWidget = QWidget()
        self.setCentralWidget(self.centralWidget)

        self.VBOX = QVBoxLayout()
        self.info_LE = QLineEdit()
        self.start_PB = QPushButton('Start')
        self.start_PB.clicked.connect(self.startThread)
        self.VBOX.addWidget(self.info_LE)
        self.VBOX.addWidget(self.start_PB)

        self.centralWidget.setLayout(self.VBOX)

    def startThread(self):
        self.thread = my_Thread(M=10000, N=10000, nbCore=5)
        self.thread.finished.connect(self.threadFinished)

        self.timer = QTimer()
        self.timer.setInterval(100)
        self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
        self.timer.start()
        self.ElapsedTimer = QElapsedTimer()
        self.ElapsedTimer.start()

        self.thread.start()

    def threadFinished(self):
        self.timer.stop()
        self.thread.exit()
        print('Finished')

    def updatemsgs(self, msg, Obj):
        nbiter = Obj.returninfo()
        print(nbiter)
        msg.setText(str(nbiter))
        self.parent.processEvents()

if __name__ == "__main__":
    app = QApplication(sys.argv)
    ex = GUI(app)
    ex.show()
    sys.exit(app.exec())

在此示例中,我创建了继承自 Qthread 的 class my_Thread。在这个 QThread class 中,我通过 class mp_worker_class 调用了一个多处理工作线程,它并行调用了函数 mp_worker 的 5 倍。在 class mp_worker_class 中,我有一个变量 nbiter = 0,每次我在函数 mp_worker 中执行循环时它都会增加 1。我可以验证 nbiter 确实在增加,因为我可以通过打印看到它的价值。但是从 my_Thread.returninfo() 函数中,我只是 return 来自 mp_worker_class class 的 nbiter 值,我只得到零。

我想要的是在我可以在 GUI 中看到的 pyqt5 QlineEdit wighet (info_LE) 中打印 mp_worker_class.nbiter 的值。我每 0.1 秒更新一次文本。现在它只打印零。

Python 中的子进程默认不共享内存——一个进程中的代码 运行 无法访问或更改另一个进程使用的内存。这意味着每个进程都有您正在使用的每个变量的副本,包括 mp_worker_class.nbiter 变量。因此,您看不到子进程从父进程(或任何其他子进程)对其 mp_worker_class.nbiter 变量所做的更改。

如您所见,我们可以使用multiprocessing.Process 构造函数的args 关键字参数将数据从父进程获取到子进程。但是,这只是将数据从父级复制到子级;我们仍然没有在两个进程之间共享内存。

import multiprocessing

def my_example(arg):
    arg.append(35)
    print("Child arg:",arg)
    
if __name__ == "__main__":
    l = [1,2,3]
    print("Before:",l)
    p = multiprocessing.Process(target=my_example, args=(l,))
    p.start()
    p.join()
    print("After:",l)
    
# Before: [1, 2, 3]
# Child arg: [1, 2, 3, 35]
# After: [1, 2, 3]

幸运的是,multiprocessing 提供了一个 Value class 可以很容易地在共享内存中创建变量。关键是在父进程中创建Value,然后通过args参数将Value分发给子进程到multiprocessing.Process


在您的代码中,您可以在 my_Thread 的构造函数中创建 multiprocessing.Value。例如,您可以添加

self.nbiter = multiprocessing.Value('i',0)

这将创建一个整数 multiprocessing.Value(即 i 的含义)并将其初始化为 0。然后您可以将此 Value 传递给 self.my_worker.start 类方法,反过来,它可以将 Value 传递给它的子 mp_worker 进程。

multiprocessing.Value 对象关联的原始值可以通过其 value 属性访问。因此,您需要更改 mp_worker 类方法中的代码以更改 multiprocessing.Value 对象的 value 属性。

您还需要考虑这样一个事实,即在使用 multiprocessing.Value+= 操作不是原子操作。因此,您的代码需要在递增之前获取 Value 上的锁。如果为 mp_worker 创建了一个名为 nbiter 的新参数,用于递增 nbiter 的代码应该如下所示。

with nbiter.get_lock():
    nbiter.value += 1

您还需要将 my_Thread.returninfo 方法更改为 return self.nbiter.value。如果 my_Thread 由于某种原因将重新启动,您可能还想在 my_Thread.run 方法的开头设置 self.nbiter.value = 0

总而言之,您的代码可能如下所示。

from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *

import sys
import numpy as np
import multiprocessing

class my_Thread(QThread):
    finished = pyqtSignal()

    def __init__(self,M=100, N=100, nbCore=2):
        QThread.__init__(self, )
        self.M = M
        self.N = N
        self.nbCore = nbCore
        self.nbiter = multiprocessing.Value('i',0)

    def run(self):
        self.nbiter.value = 0
        self.my_worker = mp_worker_class()
        self.Mean = self.my_worker.start(self.nbCore, self.M, self.N, self.nbiter)
        self.finished.emit()

    def returninfo(self):
        return self.nbiter.value

class mp_worker_class():
    def __init__(self,):
        pass

    @classmethod
    def start(self, nbCore=2, M=100, N=100, nbiter=None ):
        X = np.random.rand(nbCore,M,N)
        pipe_list = []
        for i in range(nbCore):
            recv_end, send_end = multiprocessing.Pipe()
            p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end, nbiter))
            p.start()
            pipe_list.append(recv_end)

        for idx, recv_end in enumerate(pipe_list):
            Ymean =recv_end.recv()
            print(Ymean)

    @classmethod
    def mp_worker(self, X=None, send_end=None, nbiter=None):
        mean = 0
        nb =0
        for i in range(X.shape[0]):
            for j in range(X.shape[1]):
                # print(self.nbiter)
                mean += X[i,j]
                nb += 1
                with nbiter.get_lock():
                    nbiter.value += 1
        mean /= nb
        send_end.send([mean])


class GUI(QMainWindow):
    def __init__(self, parent=None):
        super(GUI, self).__init__()
        self.parent = parent

        self.centralWidget = QWidget()
        self.setCentralWidget(self.centralWidget)

        self.VBOX = QVBoxLayout()
        self.info_LE = QLineEdit()
        self.start_PB = QPushButton('Start')
        self.start_PB.clicked.connect(self.startThread)
        self.VBOX.addWidget(self.info_LE)
        self.VBOX.addWidget(self.start_PB)

        self.centralWidget.setLayout(self.VBOX)

    def startThread(self):
        self.thread = my_Thread(M=10000, N=10000, nbCore=5)
        self.thread.finished.connect(self.threadFinished)

        self.timer = QTimer()
        self.timer.setInterval(100)
        self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
        self.timer.start()
        self.ElapsedTimer = QElapsedTimer()
        self.ElapsedTimer.start()

        self.thread.start()

    def threadFinished(self):
        self.timer.stop()
        self.thread.exit()
        print('Finished')

    def updatemsgs(self, msg, Obj):
        nbiter = Obj.returninfo()
        print(nbiter)
        msg.setText(str(nbiter))
        self.parent.processEvents()

if __name__ == "__main__":
    app = QApplication(sys.argv)
    ex = GUI(app)
    ex.show()
    sys.exit(app.exec())