wxpython中的线程安全

Thread safety in wxpython

我正在使用 wxpython 为分析和处理音频文件的命令行工具构建前端 GUI。文件被加载到 GUI 中;然后启动执行分析和调整步骤的线程;最后这些过程的结果显示在主 window 中。

我努力编写线程安全的代码;然而,一些线程仍然无法完成(应该注意的是,当我第二次手动启动它们时,它们通常 运行 完成)。下面我包含了我的程序的删节版本,其中包含 类 用于 AnalysisThread、AdjustThread 和 MainWindow。主 window 中的按钮绑定到函数 "OnAnalyze" 和 "OnAdjust,",它们创建适当线程 类 的实例。线程本身通过 wx.CallAfter 和 Publisher 与 GUI 通信。据我了解,这应该允许数据在主进程和线程之间安全地来回传递。如果有人能指出我下面的代码哪里出了问题,我将不胜感激。

如果我无法修复线程安全问题,我的后备计划是以某种方式检测线程的死亡并尝试在幕后 "resuscitate" 它,而不让用户知道存在故障。这看起来合理吗?如果是这样,将非常欢迎有关如何实现此目的的建议。

非常感谢。

#!/usr/bin/python

import wx
import time
from threading import Thread
import os, sys, re, subprocess, shutil
from wx.lib.pubsub import setuparg1
from wx.lib.pubsub import pub as Publisher


#Start a thread that analyzes audio files.
class AnalysisThread(Thread):
    def __init__(self,args):
        Thread.__init__(self)
        self.file = args[0]
        self.index = args[1]
        self.setDaemon(True)
        self.start()

    def run(self):
       proc = subprocess.Popen(['ffmpeg', '-nostats', '-i', self.file, '-filter_complex', 'ebur128=peak=true+sample', '-f', 'null', '-'], bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
       flag = 0
       summary = ""
       while proc.poll() is None:
          line = proc.stdout.readline()
          if line:
             endProcess = re.search(r'Summary', line)
             if endProcess is not None:
                flag = 1
             if flag:
                summary += line

       wx.CallAfter(Publisher.sendMessage, "update", (self.file, summary, self.index))

#Start a thread that adjusts audio files so that they conform to EBU loudness standards.
class AdjustThread(Thread):
    def __init__(self,args):
        Thread.__init__(self)
        self.file = args[0]
        self.index = args[1]
        self.IL = args[2]
        self.TP = args[3]
        self.SP = args[4]
        self.setDaemon(True)
        self.start()

    def run(self):

       proc = subprocess.Popen(['ffmpeg', '-nostats', '-i', adjusted_file, '-filter_complex', 'ebur128=peak=true+sample', '-f', 'null', '-'], bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
       flag = 0
       summary = ""
       while proc.poll() is None:
          line = proc.stdout.readline()
          if line:
             endProcess = re.search(r'Summary', line)
             if endProcess is not None:
                flag = 1
             if flag:
                summary += line

       wx.CallAfter(Publisher.sendMessage, "update", (self.file, summary, self.index))


class MainWindow(wx.Frame):

    fileList = collections.OrderedDict()

    def __init__(self, parent, id, title):
        wx.Frame.__init__(self, parent, id, title, size=(900, 400))

        Publisher.subscribe(self.UpdateDisplay, "update")

        #Add "analyze" and "Adjust" buttons to the main frame.
        panel = wx.Panel(self, -1)
        vbox = wx.BoxSizer(wx.VERTICAL)
        self.ana = wx.Button(panel, -1, 'Analyze', size=(100, -1))
        self.adj = wx.Button(panel, -1, 'Adjust', size=(100, -1))
        self.Bind(wx.EVT_BUTTON, self.OnAnalyze, id=self.ana.GetId())
        self.Bind(wx.EVT_BUTTON, self.OnAdjust, id=self.adj.GetId())
        vbox.Add(self.ana, 0, wx.ALL, 10)
        vbox.Add(self.adj, 0, wx.ALL, 10)
        vbox.Add(self.list, 1, wx.EXPAND | wx.TOP, 3)
        vbox.Add((-1, 10))
        panel.SetSizer(hbox)

        self.Centre()
        self.Show(True)

    #This function gets called when "Analyze" is pressed.
    def OnAnalyze(self, event):

        for (file,index) in toAnalyze:

           #Add a progess bar
           item = self.list.GetItem(index,2)
           gauge = item.GetWindow()
           gauge.Pulse()

           #Launch the analysis thread
           AnalysisThread(args=(file,index,))

    #This function gets called when "Adjust" is pressed.
    def OnAdjust(self, event):

        for (file,index) in toAdjust:
           gauge = wx.Gauge(self.list,-1,range=50,size=(width,15),style=wx.GA_HORIZONTAL | wx.GA_SMOOTH)
           gauge.Pulse() #shouldn't start this right away...
           item.SetWindow(gauge, wx.ALIGN_CENTRE)
           self.list.SetItem(item) 

           #Launch the adjust thread
           AdjustThread(args=(file,index,intloud,truepeak,samplepeak))

    #This function is invoked by the Publisher.
    def UpdateDisplay(self, msg):
       t = msg.data 
       file = t[0]
       summary = t[1]
       i = t[2]
       self.ProcessSummary(file, summary, i)  
       item = self.list.GetItem(i,2)
       gauge = item.GetWindow()
       gauge.SetValue(50)
       self.fileList[file][1] = True

    #Display information from the threads in the main frame.
    def ProcessSummary(self, file, summary, i):
      loudnessRange = re.search(r'LRA:\s(.+?) LU', summary)
      if loudnessRange is not None:
         LRA = loudnessRange.group(1)
      else:
         LRA = "n/a"
      self.list.SetStringItem(i,7,LRA)         
      self.fileList[file][6] = LRA

      intloud = re.search(r'I:\s(.+?) LUFS', summary)
      if intloud is not None:
         IL = intloud.group(1)
      else:
         IL = "n/a"
      self.list.SetStringItem(i,4,IL)
      self.fileList[file][3] = IL

      truePeak = re.search(r'True peak:\s+Peak:\s(.+?) dBFS', summary)
      if truePeak is not None:
          TP = truePeak.group(1)
      else:
          TP = "n/a"
      self.list.SetStringItem(i,5,TP)
      self.fileList[file][4] = TP

      samplePeak = re.search(r'Sample peak:\s+Peak:\s(.+?) dBFS', summary)
      if samplePeak is not None:
          SP = samplePeak.group(1)
      else:
          SP = "n/a"
      self.list.SetStringItem(i,6,SP)
      self.fileList[file][5] = SP


app = wx.App()
MainWindow(None, -1, 'Leveler')
app.MainLoop()

这是从 2.8 系列的 wxPython 演示中获取的 delayedresult 示例代码。我修改了这段代码以满足我的需要。

顺便说一下,wxPython 演示是学习 wx 的宝贵资源。我强烈推荐。通过演示,wxPython 学习起来很有趣!

看来版本 3 现已正式发布,因此稳定。但如果您使用的是旧版本,您可以在此处找到演示:http://sourceforge.net/projects/wxpython/files/wxPython/

import wx
import wx.lib.delayedresult as delayedresult


class FrameSimpleDelayedBase(wx.Frame):
    def __init__(self, *args, **kwds):
        wx.Frame.__init__(self, *args, **kwds)
        pnl = wx.Panel(self)
        self.checkboxUseDelayed = wx.CheckBox(pnl, -1, "Using delayedresult")
        self.buttonGet = wx.Button(pnl, -1, "Get")
        self.buttonAbort = wx.Button(pnl, -1, "Abort")
        self.slider = wx.Slider(pnl, -1, 0, 0, 10, size=(100,-1),
                                style=wx.SL_HORIZONTAL|wx.SL_AUTOTICKS)
        self.textCtrlResult = wx.TextCtrl(pnl, -1, "", style=wx.TE_READONLY)

        self.checkboxUseDelayed.SetValue(1)
        self.checkboxUseDelayed.Enable(False)
        self.buttonAbort.Enable(False)

        vsizer = wx.BoxSizer(wx.VERTICAL)
        hsizer = wx.BoxSizer(wx.HORIZONTAL)
        vsizer.Add(self.checkboxUseDelayed, 0, wx.ALL, 10)
        hsizer.Add(self.buttonGet, 0, wx.ALL, 5)
        hsizer.Add(self.buttonAbort, 0, wx.ALL, 5)
        hsizer.Add(self.slider, 0, wx.ALL, 5)
        hsizer.Add(self.textCtrlResult, 0, wx.ALL, 5)
        vsizer.Add(hsizer, 0, wx.ALL, 5)
        pnl.SetSizer(vsizer)
        vsizer.SetSizeHints(self)

        self.Bind(wx.EVT_BUTTON, self.handleGet, self.buttonGet)
        self.Bind(wx.EVT_BUTTON, self.handleAbort, self.buttonAbort)




class FrameSimpleDelayed(FrameSimpleDelayedBase):
    """This demos simplistic use of delayedresult module."""

    def __init__(self, *args, **kwargs):
        FrameSimpleDelayedBase.__init__(self, *args, **kwargs)
        self.jobID = 0
        self.abortEvent = delayedresult.AbortEvent()
        self.Bind(wx.EVT_CLOSE, self.handleClose)

    def setLog(self, log):
        self.log = log

    def handleClose(self, event):
        """Only needed because in demo, closing the window does not kill the 
        app, so worker thread continues and sends result to dead frame; normally
        your app would exit so this would not happen."""
        if self.buttonAbort.IsEnabled():
            self.log( "Exiting: Aborting job %s" % self.jobID )
            self.abortEvent.set()
        self.Destroy()

    def handleGet(self, event): 
        """Compute result in separate thread, doesn't affect GUI response."""
        self.buttonGet.Enable(False)
        self.buttonAbort.Enable(True)
        self.abortEvent.clear()
        self.jobID += 1

        self.log( "Starting job %s in producer thread: GUI remains responsive"
                  % self.jobID )
        delayedresult.startWorker(self._resultConsumer, self._resultProducer, 
                                  wargs=(self.jobID,self.abortEvent), jobID=self.jobID)


    def _resultProducer(self, jobID, abortEvent):
        """Pretend to be a complex worker function or something that takes 
        long time to run due to network access etc. GUI will freeze if this 
        method is not called in separate thread."""
        import time
        count = 0
        while not abortEvent() and count < 50:
            time.sleep(0.1)
            count += 1
        return jobID


    def handleAbort(self, event): 
        """Abort the result computation."""
        self.log( "Aborting result for job %s" % self.jobID )
        self.buttonGet.Enable(True)
        self.buttonAbort.Enable(False)
        self.abortEvent.set()


    def _resultConsumer(self, delayedResult):
        jobID = delayedResult.getJobID()
        assert jobID == self.jobID
        try:
            result = delayedResult.get()
        except Exception, exc:
            self.log( "Result for job %s raised exception: %s" % (jobID, exc) )
            return

        # output result
        self.log( "Got result for job %s: %s" % (jobID, result) )
        self.textCtrlResult.SetValue(str(result))

        # get ready for next job:
        self.buttonGet.Enable(True)
        self.buttonAbort.Enable(False)


class FrameSimpleDirect(FrameSimpleDelayedBase):
    """This does not use delayedresult so the GUI will freeze while
    the GET is taking place."""

    def __init__(self, *args, **kwargs):
        self.jobID = 1
        FrameSimpleDelayedBase.__init__(self, *args, **kwargs)
        self.checkboxUseDelayed.SetValue(False)

    def setLog(self, log):
        self.log = log

    def handleGet(self, event): 
        """Use delayedresult, this will compute result in separate
        thread, and will affect GUI response because a thread is not
        used."""
        self.buttonGet.Enable(False)
        self.buttonAbort.Enable(True)

        self.log( "Doing job %s without delayedresult (same as GUI thread): GUI hangs (for a while)" % self.jobID )
        result = self._resultProducer(self.jobID)
        self._resultConsumer( result )

    def _resultProducer(self, jobID):
        """Pretend to be a complex worker function or something that takes 
        long time to run due to network access etc. GUI will freeze if this 
        method is not called in separate thread."""
        import time
        time.sleep(5)
        return jobID

    def handleAbort(self, event):
        """can never be called"""
        pass

    def _resultConsumer(self, result):
        # output result
        self.log( "Got result for job %s: %s" % (self.jobID, result) )
        self.textCtrlResult.SetValue(str(result))

        # get ready for next job:
        self.buttonGet.Enable(True)
        self.buttonAbort.Enable(False)
        self.jobID += 1


#---------------------------------------------------------------------------
#---------------------------------------------------------------------------

class TestPanel(wx.Panel):
    def __init__(self, parent, log):
        self.log = log
        wx.Panel.__init__(self, parent, -1)

        vsizer = wx.BoxSizer(wx.VERTICAL)
        b = wx.Button(self, -1, "Long-running function in separate thread")
        vsizer.Add(b, 0, wx.ALL, 5)
        self.Bind(wx.EVT_BUTTON, self.OnButton1, b)

        b = wx.Button(self, -1, "Long-running function in GUI thread")
        vsizer.Add(b, 0, wx.ALL, 5)
        self.Bind(wx.EVT_BUTTON, self.OnButton2, b)

        bdr = wx.BoxSizer()
        bdr.Add(vsizer, 0, wx.ALL, 50)
        self.SetSizer(bdr)
        self.Layout()

    def OnButton1(self, evt):
        frame = FrameSimpleDelayed(self, title="Long-running function in separate thread")
        frame.setLog(self.log.WriteText)
        frame.Show()

    def OnButton2(self, evt):
        frame = FrameSimpleDirect(self, title="Long-running function in GUI thread")
        frame.setLog(self.log.WriteText)
        frame.Show()