如果从 QThread 启动,Joblib Parallel 仅使用一个内核

Joblib Parallel uses only one core if started from QThread

我正在开发一个 GUI,它可以执行一些繁重的数据处理运行ching。为了加快速度,我将 joblib 的并行执行与 pyqt 的 QThreads 一起使用,以避免 GUI 变得无响应。到目前为止,并行执行工作正常,但如果嵌入到 GUI 中并 运行 在它自己的线程中,它只使用我的 4 个核心之一。我在 threading/multiprocessing 世界中错过了什么基本知识?

这是我的设置的粗略草图:

 class ThreadRunner(QtCore.QObject):

    start = QtCore.pyqtSignal()
    result_finished = QtCore.pyqtSignal(np.ndarray)

    def __init__(self, function, *args, **kwargs):
        super(DispMapRunner, self).__init__()

        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.start.connect(self.run)

    def run(self):
        print "New Thread started"
        result = self.function(*self.args, **self.kwargs)
        self.result_finished.emit(result)

class Gui(QtGui.QMainWindow, form_class):
    def __init__(self, cl_args, parent=None):
        super(Gui, self).__init__()
        #other stuff

    def start_thread(self, fun, *args, **kwargs):
        self.runner = ThreadRunner(fun, *args, **kwargs)
        self.thread = QtCore.QThread() 
        self.runner.moveToThread(self.thread)
        # more stuff for catching results

def slice_producer(data):
    n_images, rows, cols = data.shape[:3]
    for r in range(rows):
        yield np.copy(data[:,r,...])

    def run_parallel(data, *args, **kwargs):
        results = joblib.Parallel(
                    n_jobs=4,
                    verbose=12,
                    pre_dispatch='1.5*n_jobs'
                    )
                    (
                    delayed(
                    memory.cache(do_long_computation))
                    (slice, **kwargs) for slice in slice_producer(data)
                    )   

我希望它不要太长,同时也不要太模糊。我使用 pyqt4 4.11.3 和 joblib 0.8.4.

我再次检查我的代码并注意到以下警告:

UserWarning: Multiprocessing backed parallel loops cannot 
be nested below threads, setting n_jobs=1

将我的问题细化为以下内容:如何 运行 在单独的线程中进行多处理进程?

好的,多亏了 ekhumoro,我找到了一些有用的东西,只在 mp.pool 的实例上使用,并且可以使用回调。唯一的缺点是,子进程中的错误会悄无声息地失败(例如,将结果更改为 f_wrapper)。这里的代码供以后参考:

from PyQt4.QtCore import *
from PyQt4.QtGui import *
import multiprocessing
import sys
import numpy as np
import time

def f(data_slice, **kwargs):
    '''This is a time-intensive function, which we do not want to alter
    '''
    data = 0
    for row in range(data_slice.shape[0]):
        for col in range(data_slice.shape[1]):
            data += data_slice[row,col]**2
    time.sleep(0.1)
    return data, 3, 5, 3 # some dummy calculation results


def f_wrapper(row, data_slice,  **kwargs):
    results = f(data_slice, **kwargs)
    return row, results

class MainWindow(QMainWindow): #You can only add menus to QMainWindows

    def __init__(self):
        super(MainWindow, self).__init__()
        self.pool = multiprocessing.Pool(processes=4)

        button1 = QPushButton('Connect', self)
        button1.clicked.connect(self.apply_connection)
        self.text = QTextEdit()

        vbox1 = QVBoxLayout()
        vbox1.addWidget(button1)
        vbox1.addWidget(self.text)
        myframe = QFrame()
        myframe.setLayout(vbox1)

        self.setCentralWidget(myframe)
        self.show() #display and activate focus
        self.raise_()


    def apply_connection(self):
        self.rows_processed = list()
        self.max_size = 1000
        data = np.random.random(size = (100, self.max_size,self.max_size))
        kwargs = {'some_kwarg' : 1000}
        for row in range(data.shape[1]):
            slice = data[:,row, :]
            print "starting f for row ", row 
            result = self.pool.apply_async(f_wrapper, 
                                           args = (row, slice), 
                                           kwds = kwargs,
                                           callback=self.update_gui)
            #~ result.get() # blocks gui, but raises errors for debugging


    def update_gui(self, result):
        row, func_result = result
        self.rows_processed.append(row)
        print len(self.rows_processed)
        print func_result# or do something more intelligent
        self.text.append('Applied connection. Row = %d\n' % row)
        if len(self.rows_processed) == self.max_size:
            print "Done!" 




if __name__ == '__main__':
    app = QApplication(sys.argv)
    gui = MainWindow()
    app.exec_()

如果有一个很好的方法来捕获错误,那将是一个很好的奖励。