线程tkinter串口读取函数

Threading tkinter serial reading function

我正在制作一个应用程序,用于读取来自 arduino 板上传感器的串行数据。尝试使用 matplotlib.animation class 制作所述数据的实时图表时会出现问题。当绘图正在进行时,GUI 小部件变得无响应。据我所知,在自己的线程上进行串行读取过程 运行 可能会解决这个问题。我无法理解如何制作它以便它与 FuncAnimation-subclass.

兼容
def read_serial_data(port, bauds=9600):
    s = serial.Serial(port, bauds)
    line = s.readline()[0:-2] 
    return line

def getPorts():
    return [port.device for port in serial.tools.list_ports.comports(include_links=False)]

class GUI():
    def __init__(self):
        self.root = Tk.Tk()
        self._fig = plt.figure()

        self.root.title('Measurement Dashboard')
        self.root.state('normal')
        self.root.config(background='#ffffff')

        self._canvas = FigureCanvasTkAgg(self._fig, self.root)
        self._canvas.get_tk_widget().grid(column = 1, row = 1)
        self._canvas.draw()
        self._animate = None

        self._ax = self._fig.add_subplot(111)       
        self._ax.yaxis.grid(True, color = 'black', linestyle='--')
        self._ax.xaxis.grid(True, color = 'black', linestyle='--')
        self._ax.set_xlabel('time')
        self._ax.set_ylabel('CO2')
        
        self.filename = Tk.StringVar()
        self.entry = ttk.Entry(self.root, textvariable = self.filename)
        self.entry.grid(column = 2, row = 2)

        self.info_var = Tk.StringVar()
        self.info_entry = ttk.Entry(self.root, textvariable = self.info_var)
        self.info_entry.grid(column = 2, row = 3)

        self.port = Tk.StringVar()
        self.ports = getPorts()
        self._cb = ttk.Combobox(self.root, textvariable= self.port, values = self.ports)
        self._cb.grid(column = 2, row = 1)

        self.start_button = Tk.Button(self.root, text = 'Start', command = self.plot)
        self.start_button.grid(column = 1, row = 2) 

        self.save_button = Tk.Button(self.root, text = 'Save info', command = self.save_info)
        self.save_button.grid(column = 2, row = 4)

    def save_info(self):
        global info
        info = self.info_var.get()

    def start(self):
        self.root.mainloop()

    def plot(self):
        if self._animate is None:
            self.scope = Scope(self._ax, self.filename.get())
            self._canvas.draw_idle()
            self._animate = animation.FuncAnimation(self._fig, self.scope.animate, frames = self.update, interval=2000, blit=False)            

    def update(self): 
        line = read_serial_data(self.port.get())
        data = line.decode('utf-8')
        
        yield data

        time = datetime.now()
        duration = time - start_time

        measurement = {'time': time, 'dur': duration.seconds, 'CO2': data, 'info': info}     
        write_csv_line(self.filename.get(), measurement)
        self.root.after(10000, self.update)

if __name__ == "__main__":
    gui = GUI()
    gui.start()
    thread = Thread(target=read_serial_data,args=(gui.port,))
    thread.start()

您实际上并不需要另一个线程,但可以在串行端口上使用 non-blocking IO 并使用 Tkinter after 调用来管理轮询间隔。 pyserial provices inWaiting 测试设备是否有等待读取的输入。如果有字节等待,只读那些。

这里是一个示例 reader class 从串行设备读取行,并在读取完整行后 post 将它们发送到应用程序处理程序方法。

class Reader(Serial):
    def __init__(self, *args, **kwargs):
        self.buffer = ''
        super(Reader, self).__init__(*args, **kwargs)
    def monitor(self, w, interval=10):
        n = self.inWaiting()
        if n != 0:
            data = self.read(n).decode('ascii')
            self.buffer = self.buffer + data
            ndx = self.buffer.find("\n")
            if ndx != -1:
                line = self.buffer[:ndx+1]
                self.buffer = self.buffer[ndx+1:]
                w.after_idle(lambda: w.parse_input(line))
        w.after(interval, lambda: self.monitor(w, interval))

像这样使用:

app = <tkinter application class instance>
reader = Reader(port, baudrate, timeout=0)
reader.flushInput()
reader.monitor(app, 10)
app.mainloop()

在这种情况下,每当读取一行(由换行符分隔)时,它将在 app 实例上调用 parse_input 方法。

如果您决定使用线程,那么您需要一个队列来将数据传递给 Tkinter UI 线程并且必须确保您不从工作线程调用 Tkinter 方法。