是否可以在事件发生后使用 concurrent.futures 在 tkinter class 中执行 function/method?如果是,如何?

Is it possible to use concurrent.futures to execute a function/method inside a tkinter class following an event? If yes, how?

我正在尝试使用 concurrent.futures.ProcessPoolExecutor 提供的工作池来加速 tkinter class 中方法的性能。这是因为执行该方法 cpu 密集并且 "parallelizing" 它应该缩短完成它的时间。我希望将它的性能与控件进行基准测试——同一方法的串行执行。我已经编写了一个 tkinter GUI 测试代码来执行此基准测试。该方法的串行执行有效,但并发部分无效。感谢任何帮助让我的代码的并发部分工作。

更新: 我已确保我已正确实施 concurrent.futures.ProcessPoolExecutor 以在 Tk() 之外解决我的问题,即来自标准 python3 脚本.在此中进行了说明。现在我想实现该答案中描述的并发方法,以便在我的 tkinter.Tk() GUI 中使用按钮。

我的测试代码如下。当你 运行 它时,会出现一个 GUI。当你点击 'FIND' 按钮时,_findmatch 函数会以串行并发的方式执行,找出数字 5 在 0 到 1E8 的数字范围内出现了多少次。串行部分有效,但并发部分在抱怨(见下文)。 有人知道如何解决这个 Pickling 错误吗?

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_tkinter.tkapp'>: attribute lookup tkapp on _tkinter failed

测试代码:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import tkinter as tk # Python 3 tkinter modules
import tkinter.ttk as ttk
import concurrent.futures as cf
from time import time, sleep
from itertools import repeat, chain 

class App(ttk.Frame):
    def __init__(self, parent):
        # Initialise App Frame
        ttk.Frame.__init__(self, parent, style='App.TFrame')
        self.parent=parent

        self.button = ttk.Button(self, style='start.TButton', text = 'FIND',
                                 command=self._check)
        self.label0 = ttk.Label(self, foreground='blue')
        self.label1 = ttk.Label(self, foreground='red')
        self.label2 = ttk.Label(self, foreground='green')
        self._labels()
        self.button.grid(row=0, column=1, rowspan=3, sticky='nsew')
        self.label0.grid(row=0, column=0, sticky='nsew')
        self.label1.grid(row=1, column=0, sticky='nsew')
        self.label2.grid(row=2, column=0, sticky='nsew')

    def _labels(self):
        self.label0.configure(text='Click "FIND" to see how many times the number 5 appears.')
        self.label1.configure(text='Serial Method:')
        self.label2.configure(text='Concurrent Method:')

    def _check(self):
        # Initialisation
        self._labels()
        nmax = int(1E7)
        smatch=[]
        cmatch=[]
        number = '5'
        self.label0.configure(
            text='Finding the number of times {0} appears in 0 to {1}'.format(
                number, nmax))
        self.parent.update_idletasks()

        # Run serial code
        start = time()
        smatch = self._findmatch(0, nmax, number)
        end = time() - start
        self.label1.configure(
            text='Serial: Found {0} occurances,  Time to Find: {1:.6f}sec'.format(
                len(smatch), end))

        # Run serial code concurrently with concurrent.futures
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
        start = time()
        cmatch = self._concurrent_map(nmax, number, workers, num_of_chunks)
        end = time() - start
        self.label2.configure(
            text='Concurrent: Found {0} occurances,  Time to Find: {1:.6f}sec'.format(
                len(cmatch), end))

    def _findmatch(self, nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n): match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match

    def _concurrent_map(self, nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            cstart = (chunksize * i for i in range(num_of_chunks))
            cstop = (chunksize * i if i != num_of_chunks else nmax
                     for i in range(1, num_of_chunks + 1))
            futures = executor.map(self._findmatch, cstart, cstop, repeat(number))
        end = time() - start
        print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(futures))


if __name__ == '__main__':
    root = tk.Tk()
    root.title('App'), root.geometry('550x60')
    app = App(root)
    app.grid(row=0, column=0, sticky='nsew')

    root.rowconfigure(0, weight=1)
    root.columnconfigure(0, weight=1)
    app.columnconfigure(0, weight=1)

    app.mainloop()

我终于找到了一种方法来回答我的问题。

Mark Summerfields 的书,Python in Practice(2014),提到由 concurrent.futures.ProcessPoolExecutor 调用的 multiprocessing 模块只能调用可导入的函数并使用模块数据 ( 由函数调用) 可 pickleable。因此,有必要在 tkinter GUI 模块之外的单独模块中找到 concurrent.futures.ProcessPoolExecutor 及其调用的函数(及其参数),否则它将无法工作。

因此,我创建了一个单独的 class 来托管与 concurrent.futures.ProcessPoolExecutor 相关的所有代码及其调用的函数和数据,而不是将它们放在 class 应用程序中,我的 tkinter.Tk() GUI class,就像我以前做的一样。成功了!

我还设法使用 threading.Threads 并发执行我的串行和并发任务。

我在下面分享我修改后的测试代码来演示我是如何做到的,希望这对任何试图将 concurrent.futures 与 tkinter 一起使用的人有所帮助。

看到所有 CPU 都在使用 Tk GUI 加速,真是太好了。 :)

修改后的测试代码:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
''' Code to demonstrate how to use concurrent.futures.Executor object with tkinter.'''

import tkinter as tk # Python 3 tkinter modules
import tkinter.ttk as ttk
import concurrent.futures as cf
import threading
from time import time, sleep
from itertools import chain 


class App(ttk.Frame):
    def __init__(self, parent):
        # Initialise App Frame
        ttk.Frame.__init__(self, parent)
        self.parent=parent

        self.button = ttk.Button(self, text = 'FIND', command=self._check)
        self.label0 = ttk.Label(self, foreground='blue')
        self.label1 = ttk.Label(self, foreground='red')
        self.label2 = ttk.Label(self, foreground='green')
        self._labels()
        self.button.grid(row=0, column=1, rowspan=3, sticky='nsew')
        self.label0.grid(row=0, column=0, sticky='nsew')
        self.label1.grid(row=1, column=0, sticky='nsew')
        self.label2.grid(row=2, column=0, sticky='nsew')

    def _labels(self):
        self.label0.configure(text='Click "FIND" to see how many times the number 5 appears.')
        self.label1.configure(text='Serial Method:')
        self.label2.configure(text='Concurrent Method:')

    def _check(self):
        # Initialisation
        self._labels()
        nmax = int(1E8)
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
        number = '5'
        self.label0.configure(
            text='Finding the number of times {0} appears in 0 to {1}'.format(
                number, nmax))
        self.parent.update_idletasks()
        # Concurrent management of serial and concurrent tasks using threading
        self.serworker = threading.Thread(target=self._serial,
                                          args=(0, nmax, number))
        self.subworker  = threading.Thread(target=self._concurrent,
                                           args=(nmax, number, workers,
                                                 num_of_chunks))
        self.serworker.start()         
        self.subworker.start()         

    def _serial(self, nmin, nmax, number):
        fm = Findmatch
        # Run serial code
        start = time()
        smatch = fm._findmatch(fm, 0, nmax, number)
        end = time() - start
        self.label1.configure(
            text='Serial Method: {0} occurrences, Compute Time: {1:.6f}sec'.format(
                len(smatch), end))
        self.parent.update_idletasks()
        #print('smatch = ', smatch) 

    def _concurrent(self, nmax, number, workers, num_of_chunks): 
        fm = Findmatch
        # Run serial code concurrently with concurrent.futures .submit()
        start = time()
        cmatch = fm._concurrent_submit(fm, nmax, number, workers,
                                        num_of_chunks)
        end = time() - start
        self.label2.configure(
            text='Concurrent Method: {0} occurrences, Compute Time: {1:.6f}sec'.format(
                len(cmatch), end))
        self.parent.update_idletasks()
        #print('cmatch = ', cmatch) 


class Findmatch:
    ''' A class specially created to host concurrent.futures.ProcessPoolExecutor
        so that the function(s) it calls can be accessible by multiprocessing
        module. Multiprocessing requirements: codes must be importable and code
        data must be pickerable. ref. Python in Practice, by Mark Summerfields,
        section 4.3.2, pg 173, 2014'''
    def __init__(self):
        self.__init__(self)

    def _findmatch(self, nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n): match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match

    def _concurrent_submit(self, nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        self.futures = []
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            for i in range(num_of_chunks):
                cstart = chunksize * i
                cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
                self.futures.append(executor.submit(
                    self._findmatch, self, cstart, cstop, number))
        end = time() - start
        print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(f.result() for f in cf.as_completed(
            self.futures)))


if __name__ == '__main__':
    root = tk.Tk()
    root.title('App'), root.geometry('550x60')
    app = App(root)
    app.grid(row=0, column=0, sticky='nsew')

    root.rowconfigure(0, weight=1)
    root.columnconfigure(0, weight=1)
    app.columnconfigure(0, weight=1)

    app.mainloop()