如果从 QThread 启动,Joblib Parallel 仅使用一个内核
Joblib Parallel uses only one core if started from QThread
我正在开发一个 GUI,它可以执行一些繁重的数据处理运行ching。为了加快速度,我将 joblib 的并行执行与 pyqt 的 QThreads 一起使用,以避免 GUI 变得无响应。到目前为止,并行执行工作正常,但如果嵌入到 GUI 中并 运行 在它自己的线程中,它只使用我的 4 个核心之一。我在 threading/multiprocessing 世界中错过了什么基本知识?
这是我的设置的粗略草图:
class ThreadRunner(QtCore.QObject):
start = QtCore.pyqtSignal()
result_finished = QtCore.pyqtSignal(np.ndarray)
def __init__(self, function, *args, **kwargs):
super(DispMapRunner, self).__init__()
self.function = function
self.args = args
self.kwargs = kwargs
self.start.connect(self.run)
def run(self):
print "New Thread started"
result = self.function(*self.args, **self.kwargs)
self.result_finished.emit(result)
class Gui(QtGui.QMainWindow, form_class):
def __init__(self, cl_args, parent=None):
super(Gui, self).__init__()
#other stuff
def start_thread(self, fun, *args, **kwargs):
self.runner = ThreadRunner(fun, *args, **kwargs)
self.thread = QtCore.QThread()
self.runner.moveToThread(self.thread)
# more stuff for catching results
def slice_producer(data):
n_images, rows, cols = data.shape[:3]
for r in range(rows):
yield np.copy(data[:,r,...])
def run_parallel(data, *args, **kwargs):
results = joblib.Parallel(
n_jobs=4,
verbose=12,
pre_dispatch='1.5*n_jobs'
)
(
delayed(
memory.cache(do_long_computation))
(slice, **kwargs) for slice in slice_producer(data)
)
我希望它不要太长,同时也不要太模糊。我使用 pyqt4 4.11.3 和 joblib 0.8.4.
我再次检查我的代码并注意到以下警告:
UserWarning: Multiprocessing backed parallel loops cannot
be nested below threads, setting n_jobs=1
将我的问题细化为以下内容:如何 运行 在单独的线程中进行多处理进程?
好的,多亏了 ekhumoro,我找到了一些有用的东西,只在 mp.pool 的实例上使用,并且可以使用回调。唯一的缺点是,子进程中的错误会悄无声息地失败(例如,将结果更改为 f_wrapper)。这里的代码供以后参考:
from PyQt4.QtCore import *
from PyQt4.QtGui import *
import multiprocessing
import sys
import numpy as np
import time
def f(data_slice, **kwargs):
'''This is a time-intensive function, which we do not want to alter
'''
data = 0
for row in range(data_slice.shape[0]):
for col in range(data_slice.shape[1]):
data += data_slice[row,col]**2
time.sleep(0.1)
return data, 3, 5, 3 # some dummy calculation results
def f_wrapper(row, data_slice, **kwargs):
results = f(data_slice, **kwargs)
return row, results
class MainWindow(QMainWindow): #You can only add menus to QMainWindows
def __init__(self):
super(MainWindow, self).__init__()
self.pool = multiprocessing.Pool(processes=4)
button1 = QPushButton('Connect', self)
button1.clicked.connect(self.apply_connection)
self.text = QTextEdit()
vbox1 = QVBoxLayout()
vbox1.addWidget(button1)
vbox1.addWidget(self.text)
myframe = QFrame()
myframe.setLayout(vbox1)
self.setCentralWidget(myframe)
self.show() #display and activate focus
self.raise_()
def apply_connection(self):
self.rows_processed = list()
self.max_size = 1000
data = np.random.random(size = (100, self.max_size,self.max_size))
kwargs = {'some_kwarg' : 1000}
for row in range(data.shape[1]):
slice = data[:,row, :]
print "starting f for row ", row
result = self.pool.apply_async(f_wrapper,
args = (row, slice),
kwds = kwargs,
callback=self.update_gui)
#~ result.get() # blocks gui, but raises errors for debugging
def update_gui(self, result):
row, func_result = result
self.rows_processed.append(row)
print len(self.rows_processed)
print func_result# or do something more intelligent
self.text.append('Applied connection. Row = %d\n' % row)
if len(self.rows_processed) == self.max_size:
print "Done!"
if __name__ == '__main__':
app = QApplication(sys.argv)
gui = MainWindow()
app.exec_()
如果有一个很好的方法来捕获错误,那将是一个很好的奖励。
我正在开发一个 GUI,它可以执行一些繁重的数据处理运行ching。为了加快速度,我将 joblib 的并行执行与 pyqt 的 QThreads 一起使用,以避免 GUI 变得无响应。到目前为止,并行执行工作正常,但如果嵌入到 GUI 中并 运行 在它自己的线程中,它只使用我的 4 个核心之一。我在 threading/multiprocessing 世界中错过了什么基本知识?
这是我的设置的粗略草图:
class ThreadRunner(QtCore.QObject):
start = QtCore.pyqtSignal()
result_finished = QtCore.pyqtSignal(np.ndarray)
def __init__(self, function, *args, **kwargs):
super(DispMapRunner, self).__init__()
self.function = function
self.args = args
self.kwargs = kwargs
self.start.connect(self.run)
def run(self):
print "New Thread started"
result = self.function(*self.args, **self.kwargs)
self.result_finished.emit(result)
class Gui(QtGui.QMainWindow, form_class):
def __init__(self, cl_args, parent=None):
super(Gui, self).__init__()
#other stuff
def start_thread(self, fun, *args, **kwargs):
self.runner = ThreadRunner(fun, *args, **kwargs)
self.thread = QtCore.QThread()
self.runner.moveToThread(self.thread)
# more stuff for catching results
def slice_producer(data):
n_images, rows, cols = data.shape[:3]
for r in range(rows):
yield np.copy(data[:,r,...])
def run_parallel(data, *args, **kwargs):
results = joblib.Parallel(
n_jobs=4,
verbose=12,
pre_dispatch='1.5*n_jobs'
)
(
delayed(
memory.cache(do_long_computation))
(slice, **kwargs) for slice in slice_producer(data)
)
我希望它不要太长,同时也不要太模糊。我使用 pyqt4 4.11.3 和 joblib 0.8.4.
我再次检查我的代码并注意到以下警告:
UserWarning: Multiprocessing backed parallel loops cannot
be nested below threads, setting n_jobs=1
将我的问题细化为以下内容:如何 运行 在单独的线程中进行多处理进程?
好的,多亏了 ekhumoro,我找到了一些有用的东西,只在 mp.pool 的实例上使用,并且可以使用回调。唯一的缺点是,子进程中的错误会悄无声息地失败(例如,将结果更改为 f_wrapper)。这里的代码供以后参考:
from PyQt4.QtCore import *
from PyQt4.QtGui import *
import multiprocessing
import sys
import numpy as np
import time
def f(data_slice, **kwargs):
'''This is a time-intensive function, which we do not want to alter
'''
data = 0
for row in range(data_slice.shape[0]):
for col in range(data_slice.shape[1]):
data += data_slice[row,col]**2
time.sleep(0.1)
return data, 3, 5, 3 # some dummy calculation results
def f_wrapper(row, data_slice, **kwargs):
results = f(data_slice, **kwargs)
return row, results
class MainWindow(QMainWindow): #You can only add menus to QMainWindows
def __init__(self):
super(MainWindow, self).__init__()
self.pool = multiprocessing.Pool(processes=4)
button1 = QPushButton('Connect', self)
button1.clicked.connect(self.apply_connection)
self.text = QTextEdit()
vbox1 = QVBoxLayout()
vbox1.addWidget(button1)
vbox1.addWidget(self.text)
myframe = QFrame()
myframe.setLayout(vbox1)
self.setCentralWidget(myframe)
self.show() #display and activate focus
self.raise_()
def apply_connection(self):
self.rows_processed = list()
self.max_size = 1000
data = np.random.random(size = (100, self.max_size,self.max_size))
kwargs = {'some_kwarg' : 1000}
for row in range(data.shape[1]):
slice = data[:,row, :]
print "starting f for row ", row
result = self.pool.apply_async(f_wrapper,
args = (row, slice),
kwds = kwargs,
callback=self.update_gui)
#~ result.get() # blocks gui, but raises errors for debugging
def update_gui(self, result):
row, func_result = result
self.rows_processed.append(row)
print len(self.rows_processed)
print func_result# or do something more intelligent
self.text.append('Applied connection. Row = %d\n' % row)
if len(self.rows_processed) == self.max_size:
print "Done!"
if __name__ == '__main__':
app = QApplication(sys.argv)
gui = MainWindow()
app.exec_()
如果有一个很好的方法来捕获错误,那将是一个很好的奖励。