如何访问包含在 Qthread 中的多处理工作线程内部的变量?
How do I access to a variable inside a multiprocessing worker itself contained in a Qthread?
我正在尝试访问 QThread 中的多处理工作线程中的变量。
我举了一个最小的例子来强调我的观点:
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import sys
import numpy as np
import multiprocessing
class my_Thread(QThread):
finished = pyqtSignal()
def __init__(self,M=100, N=100, nbCore=2):
QThread.__init__(self, )
self.M = M
self.N = N
self.nbCore = nbCore
def run(self):
self.my_worker = mp_worker_class()
self.Mean = self.my_worker.start(self.nbCore, self.M, self.N)
self.finished.emit()
def returninfo(self):
return self.my_worker.nbiter
class mp_worker_class():
nbiter = 0
def __init__(self,):
pass
@classmethod
def start(self, nbCore=2, M=100, N=100 ):
self.nbiter = 0
X = np.random.rand(nbCore,M,N)
pipe_list = []
for i in range(nbCore):
recv_end, send_end = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end))
p.start()
pipe_list.append(recv_end)
for idx, recv_end in enumerate(pipe_list):
Ymean =recv_end.recv()
print(Ymean)
@classmethod
def mp_worker(self, X=None, send_end=None):
mean = 0
nb =0
for i in range(X.shape[0]):
for j in range(X.shape[1]):
# print(self.nbiter)
mean += X[i,j]
nb += 1
self.nbiter += 1
mean /= nb
send_end.send([mean])
class GUI(QMainWindow):
def __init__(self, parent=None):
super(GUI, self).__init__()
self.parent = parent
self.centralWidget = QWidget()
self.setCentralWidget(self.centralWidget)
self.VBOX = QVBoxLayout()
self.info_LE = QLineEdit()
self.start_PB = QPushButton('Start')
self.start_PB.clicked.connect(self.startThread)
self.VBOX.addWidget(self.info_LE)
self.VBOX.addWidget(self.start_PB)
self.centralWidget.setLayout(self.VBOX)
def startThread(self):
self.thread = my_Thread(M=10000, N=10000, nbCore=5)
self.thread.finished.connect(self.threadFinished)
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
self.timer.start()
self.ElapsedTimer = QElapsedTimer()
self.ElapsedTimer.start()
self.thread.start()
def threadFinished(self):
self.timer.stop()
self.thread.exit()
print('Finished')
def updatemsgs(self, msg, Obj):
nbiter = Obj.returninfo()
print(nbiter)
msg.setText(str(nbiter))
self.parent.processEvents()
if __name__ == "__main__":
app = QApplication(sys.argv)
ex = GUI(app)
ex.show()
sys.exit(app.exec())
在此示例中,我创建了继承自 Qthread
的 class my_Thread
。在这个 QThread
class 中,我通过 class mp_worker_class
调用了一个多处理工作线程,它并行调用了函数 mp_worker
的 5 倍。在 class mp_worker_class
中,我有一个变量 nbiter = 0
,每次我在函数 mp_worker
中执行循环时它都会增加 1。我可以验证 nbiter
确实在增加,因为我可以通过打印看到它的价值。但是从 my_Thread.returninfo()
函数中,我只是 return 来自 mp_worker_class
class 的 nbiter
值,我只得到零。
我想要的是在我可以在 GUI 中看到的 pyqt5 QlineEdit wighet (info_LE) 中打印 mp_worker_class.nbiter
的值。我每 0.1 秒更新一次文本。现在它只打印零。
Python 中的子进程默认不共享内存——一个进程中的代码 运行 无法访问或更改另一个进程使用的内存。这意味着每个进程都有您正在使用的每个变量的副本,包括 mp_worker_class.nbiter
变量。因此,您看不到子进程从父进程(或任何其他子进程)对其 mp_worker_class.nbiter
变量所做的更改。
如您所见,我们可以使用multiprocessing.Process
构造函数的args
关键字参数将数据从父进程获取到子进程。但是,这只是将数据从父级复制到子级;我们仍然没有在两个进程之间共享内存。
import multiprocessing
def my_example(arg):
arg.append(35)
print("Child arg:",arg)
if __name__ == "__main__":
l = [1,2,3]
print("Before:",l)
p = multiprocessing.Process(target=my_example, args=(l,))
p.start()
p.join()
print("After:",l)
# Before: [1, 2, 3]
# Child arg: [1, 2, 3, 35]
# After: [1, 2, 3]
幸运的是,multiprocessing
提供了一个 Value class 可以很容易地在共享内存中创建变量。关键是在父进程中创建Value
,然后通过args
参数将Value
分发给子进程到multiprocessing.Process
。
在您的代码中,您可以在 my_Thread
的构造函数中创建 multiprocessing.Value
。例如,您可以添加
self.nbiter = multiprocessing.Value('i',0)
这将创建一个整数 multiprocessing.Value
(即 i
的含义)并将其初始化为 0。然后您可以将此 Value
传递给 self.my_worker.start
类方法,反过来,它可以将 Value
传递给它的子 mp_worker
进程。
与 multiprocessing.Value
对象关联的原始值可以通过其 value
属性访问。因此,您需要更改 mp_worker
类方法中的代码以更改 multiprocessing.Value
对象的 value
属性。
您还需要考虑这样一个事实,即在使用 multiprocessing.Value
时 +=
操作不是原子操作。因此,您的代码需要在递增之前获取 Value
上的锁。如果为 mp_worker
创建了一个名为 nbiter
的新参数,用于递增 nbiter
的代码应该如下所示。
with nbiter.get_lock():
nbiter.value += 1
您还需要将 my_Thread.returninfo
方法更改为 return self.nbiter.value
。如果 my_Thread
由于某种原因将重新启动,您可能还想在 my_Thread.run
方法的开头设置 self.nbiter.value = 0
。
总而言之,您的代码可能如下所示。
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import sys
import numpy as np
import multiprocessing
class my_Thread(QThread):
finished = pyqtSignal()
def __init__(self,M=100, N=100, nbCore=2):
QThread.__init__(self, )
self.M = M
self.N = N
self.nbCore = nbCore
self.nbiter = multiprocessing.Value('i',0)
def run(self):
self.nbiter.value = 0
self.my_worker = mp_worker_class()
self.Mean = self.my_worker.start(self.nbCore, self.M, self.N, self.nbiter)
self.finished.emit()
def returninfo(self):
return self.nbiter.value
class mp_worker_class():
def __init__(self,):
pass
@classmethod
def start(self, nbCore=2, M=100, N=100, nbiter=None ):
X = np.random.rand(nbCore,M,N)
pipe_list = []
for i in range(nbCore):
recv_end, send_end = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end, nbiter))
p.start()
pipe_list.append(recv_end)
for idx, recv_end in enumerate(pipe_list):
Ymean =recv_end.recv()
print(Ymean)
@classmethod
def mp_worker(self, X=None, send_end=None, nbiter=None):
mean = 0
nb =0
for i in range(X.shape[0]):
for j in range(X.shape[1]):
# print(self.nbiter)
mean += X[i,j]
nb += 1
with nbiter.get_lock():
nbiter.value += 1
mean /= nb
send_end.send([mean])
class GUI(QMainWindow):
def __init__(self, parent=None):
super(GUI, self).__init__()
self.parent = parent
self.centralWidget = QWidget()
self.setCentralWidget(self.centralWidget)
self.VBOX = QVBoxLayout()
self.info_LE = QLineEdit()
self.start_PB = QPushButton('Start')
self.start_PB.clicked.connect(self.startThread)
self.VBOX.addWidget(self.info_LE)
self.VBOX.addWidget(self.start_PB)
self.centralWidget.setLayout(self.VBOX)
def startThread(self):
self.thread = my_Thread(M=10000, N=10000, nbCore=5)
self.thread.finished.connect(self.threadFinished)
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
self.timer.start()
self.ElapsedTimer = QElapsedTimer()
self.ElapsedTimer.start()
self.thread.start()
def threadFinished(self):
self.timer.stop()
self.thread.exit()
print('Finished')
def updatemsgs(self, msg, Obj):
nbiter = Obj.returninfo()
print(nbiter)
msg.setText(str(nbiter))
self.parent.processEvents()
if __name__ == "__main__":
app = QApplication(sys.argv)
ex = GUI(app)
ex.show()
sys.exit(app.exec())
我正在尝试访问 QThread 中的多处理工作线程中的变量。
我举了一个最小的例子来强调我的观点:
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import sys
import numpy as np
import multiprocessing
class my_Thread(QThread):
finished = pyqtSignal()
def __init__(self,M=100, N=100, nbCore=2):
QThread.__init__(self, )
self.M = M
self.N = N
self.nbCore = nbCore
def run(self):
self.my_worker = mp_worker_class()
self.Mean = self.my_worker.start(self.nbCore, self.M, self.N)
self.finished.emit()
def returninfo(self):
return self.my_worker.nbiter
class mp_worker_class():
nbiter = 0
def __init__(self,):
pass
@classmethod
def start(self, nbCore=2, M=100, N=100 ):
self.nbiter = 0
X = np.random.rand(nbCore,M,N)
pipe_list = []
for i in range(nbCore):
recv_end, send_end = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end))
p.start()
pipe_list.append(recv_end)
for idx, recv_end in enumerate(pipe_list):
Ymean =recv_end.recv()
print(Ymean)
@classmethod
def mp_worker(self, X=None, send_end=None):
mean = 0
nb =0
for i in range(X.shape[0]):
for j in range(X.shape[1]):
# print(self.nbiter)
mean += X[i,j]
nb += 1
self.nbiter += 1
mean /= nb
send_end.send([mean])
class GUI(QMainWindow):
def __init__(self, parent=None):
super(GUI, self).__init__()
self.parent = parent
self.centralWidget = QWidget()
self.setCentralWidget(self.centralWidget)
self.VBOX = QVBoxLayout()
self.info_LE = QLineEdit()
self.start_PB = QPushButton('Start')
self.start_PB.clicked.connect(self.startThread)
self.VBOX.addWidget(self.info_LE)
self.VBOX.addWidget(self.start_PB)
self.centralWidget.setLayout(self.VBOX)
def startThread(self):
self.thread = my_Thread(M=10000, N=10000, nbCore=5)
self.thread.finished.connect(self.threadFinished)
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
self.timer.start()
self.ElapsedTimer = QElapsedTimer()
self.ElapsedTimer.start()
self.thread.start()
def threadFinished(self):
self.timer.stop()
self.thread.exit()
print('Finished')
def updatemsgs(self, msg, Obj):
nbiter = Obj.returninfo()
print(nbiter)
msg.setText(str(nbiter))
self.parent.processEvents()
if __name__ == "__main__":
app = QApplication(sys.argv)
ex = GUI(app)
ex.show()
sys.exit(app.exec())
在此示例中,我创建了继承自 Qthread
的 class my_Thread
。在这个 QThread
class 中,我通过 class mp_worker_class
调用了一个多处理工作线程,它并行调用了函数 mp_worker
的 5 倍。在 class mp_worker_class
中,我有一个变量 nbiter = 0
,每次我在函数 mp_worker
中执行循环时它都会增加 1。我可以验证 nbiter
确实在增加,因为我可以通过打印看到它的价值。但是从 my_Thread.returninfo()
函数中,我只是 return 来自 mp_worker_class
class 的 nbiter
值,我只得到零。
我想要的是在我可以在 GUI 中看到的 pyqt5 QlineEdit wighet (info_LE) 中打印 mp_worker_class.nbiter
的值。我每 0.1 秒更新一次文本。现在它只打印零。
Python 中的子进程默认不共享内存——一个进程中的代码 运行 无法访问或更改另一个进程使用的内存。这意味着每个进程都有您正在使用的每个变量的副本,包括 mp_worker_class.nbiter
变量。因此,您看不到子进程从父进程(或任何其他子进程)对其 mp_worker_class.nbiter
变量所做的更改。
如您所见,我们可以使用multiprocessing.Process
构造函数的args
关键字参数将数据从父进程获取到子进程。但是,这只是将数据从父级复制到子级;我们仍然没有在两个进程之间共享内存。
import multiprocessing
def my_example(arg):
arg.append(35)
print("Child arg:",arg)
if __name__ == "__main__":
l = [1,2,3]
print("Before:",l)
p = multiprocessing.Process(target=my_example, args=(l,))
p.start()
p.join()
print("After:",l)
# Before: [1, 2, 3]
# Child arg: [1, 2, 3, 35]
# After: [1, 2, 3]
幸运的是,multiprocessing
提供了一个 Value class 可以很容易地在共享内存中创建变量。关键是在父进程中创建Value
,然后通过args
参数将Value
分发给子进程到multiprocessing.Process
。
在您的代码中,您可以在 my_Thread
的构造函数中创建 multiprocessing.Value
。例如,您可以添加
self.nbiter = multiprocessing.Value('i',0)
这将创建一个整数 multiprocessing.Value
(即 i
的含义)并将其初始化为 0。然后您可以将此 Value
传递给 self.my_worker.start
类方法,反过来,它可以将 Value
传递给它的子 mp_worker
进程。
与 multiprocessing.Value
对象关联的原始值可以通过其 value
属性访问。因此,您需要更改 mp_worker
类方法中的代码以更改 multiprocessing.Value
对象的 value
属性。
您还需要考虑这样一个事实,即在使用 multiprocessing.Value
时 +=
操作不是原子操作。因此,您的代码需要在递增之前获取 Value
上的锁。如果为 mp_worker
创建了一个名为 nbiter
的新参数,用于递增 nbiter
的代码应该如下所示。
with nbiter.get_lock():
nbiter.value += 1
您还需要将 my_Thread.returninfo
方法更改为 return self.nbiter.value
。如果 my_Thread
由于某种原因将重新启动,您可能还想在 my_Thread.run
方法的开头设置 self.nbiter.value = 0
。
总而言之,您的代码可能如下所示。
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import sys
import numpy as np
import multiprocessing
class my_Thread(QThread):
finished = pyqtSignal()
def __init__(self,M=100, N=100, nbCore=2):
QThread.__init__(self, )
self.M = M
self.N = N
self.nbCore = nbCore
self.nbiter = multiprocessing.Value('i',0)
def run(self):
self.nbiter.value = 0
self.my_worker = mp_worker_class()
self.Mean = self.my_worker.start(self.nbCore, self.M, self.N, self.nbiter)
self.finished.emit()
def returninfo(self):
return self.nbiter.value
class mp_worker_class():
def __init__(self,):
pass
@classmethod
def start(self, nbCore=2, M=100, N=100, nbiter=None ):
X = np.random.rand(nbCore,M,N)
pipe_list = []
for i in range(nbCore):
recv_end, send_end = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end, nbiter))
p.start()
pipe_list.append(recv_end)
for idx, recv_end in enumerate(pipe_list):
Ymean =recv_end.recv()
print(Ymean)
@classmethod
def mp_worker(self, X=None, send_end=None, nbiter=None):
mean = 0
nb =0
for i in range(X.shape[0]):
for j in range(X.shape[1]):
# print(self.nbiter)
mean += X[i,j]
nb += 1
with nbiter.get_lock():
nbiter.value += 1
mean /= nb
send_end.send([mean])
class GUI(QMainWindow):
def __init__(self, parent=None):
super(GUI, self).__init__()
self.parent = parent
self.centralWidget = QWidget()
self.setCentralWidget(self.centralWidget)
self.VBOX = QVBoxLayout()
self.info_LE = QLineEdit()
self.start_PB = QPushButton('Start')
self.start_PB.clicked.connect(self.startThread)
self.VBOX.addWidget(self.info_LE)
self.VBOX.addWidget(self.start_PB)
self.centralWidget.setLayout(self.VBOX)
def startThread(self):
self.thread = my_Thread(M=10000, N=10000, nbCore=5)
self.thread.finished.connect(self.threadFinished)
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
self.timer.start()
self.ElapsedTimer = QElapsedTimer()
self.ElapsedTimer.start()
self.thread.start()
def threadFinished(self):
self.timer.stop()
self.thread.exit()
print('Finished')
def updatemsgs(self, msg, Obj):
nbiter = Obj.returninfo()
print(nbiter)
msg.setText(str(nbiter))
self.parent.processEvents()
if __name__ == "__main__":
app = QApplication(sys.argv)
ex = GUI(app)
ex.show()
sys.exit(app.exec())