线程tkinter串口读取函数
Threading tkinter serial reading function
我正在制作一个应用程序,用于读取来自 arduino 板上传感器的串行数据。尝试使用 matplotlib.animation class 制作所述数据的实时图表时会出现问题。当绘图正在进行时,GUI 小部件变得无响应。据我所知,在自己的线程上进行串行读取过程 运行 可能会解决这个问题。我无法理解如何制作它以便它与 FuncAnimation-subclass.
兼容
def read_serial_data(port, bauds=9600):
s = serial.Serial(port, bauds)
line = s.readline()[0:-2]
return line
def getPorts():
return [port.device for port in serial.tools.list_ports.comports(include_links=False)]
class GUI():
def __init__(self):
self.root = Tk.Tk()
self._fig = plt.figure()
self.root.title('Measurement Dashboard')
self.root.state('normal')
self.root.config(background='#ffffff')
self._canvas = FigureCanvasTkAgg(self._fig, self.root)
self._canvas.get_tk_widget().grid(column = 1, row = 1)
self._canvas.draw()
self._animate = None
self._ax = self._fig.add_subplot(111)
self._ax.yaxis.grid(True, color = 'black', linestyle='--')
self._ax.xaxis.grid(True, color = 'black', linestyle='--')
self._ax.set_xlabel('time')
self._ax.set_ylabel('CO2')
self.filename = Tk.StringVar()
self.entry = ttk.Entry(self.root, textvariable = self.filename)
self.entry.grid(column = 2, row = 2)
self.info_var = Tk.StringVar()
self.info_entry = ttk.Entry(self.root, textvariable = self.info_var)
self.info_entry.grid(column = 2, row = 3)
self.port = Tk.StringVar()
self.ports = getPorts()
self._cb = ttk.Combobox(self.root, textvariable= self.port, values = self.ports)
self._cb.grid(column = 2, row = 1)
self.start_button = Tk.Button(self.root, text = 'Start', command = self.plot)
self.start_button.grid(column = 1, row = 2)
self.save_button = Tk.Button(self.root, text = 'Save info', command = self.save_info)
self.save_button.grid(column = 2, row = 4)
def save_info(self):
global info
info = self.info_var.get()
def start(self):
self.root.mainloop()
def plot(self):
if self._animate is None:
self.scope = Scope(self._ax, self.filename.get())
self._canvas.draw_idle()
self._animate = animation.FuncAnimation(self._fig, self.scope.animate, frames = self.update, interval=2000, blit=False)
def update(self):
line = read_serial_data(self.port.get())
data = line.decode('utf-8')
yield data
time = datetime.now()
duration = time - start_time
measurement = {'time': time, 'dur': duration.seconds, 'CO2': data, 'info': info}
write_csv_line(self.filename.get(), measurement)
self.root.after(10000, self.update)
if __name__ == "__main__":
gui = GUI()
gui.start()
thread = Thread(target=read_serial_data,args=(gui.port,))
thread.start()
您实际上并不需要另一个线程,但可以在串行端口上使用 non-blocking IO 并使用 Tkinter after
调用来管理轮询间隔。 pyserial provices inWaiting
测试设备是否有等待读取的输入。如果有字节等待,只读那些。
这里是一个示例 reader class 从串行设备读取行,并在读取完整行后 post 将它们发送到应用程序处理程序方法。
class Reader(Serial):
def __init__(self, *args, **kwargs):
self.buffer = ''
super(Reader, self).__init__(*args, **kwargs)
def monitor(self, w, interval=10):
n = self.inWaiting()
if n != 0:
data = self.read(n).decode('ascii')
self.buffer = self.buffer + data
ndx = self.buffer.find("\n")
if ndx != -1:
line = self.buffer[:ndx+1]
self.buffer = self.buffer[ndx+1:]
w.after_idle(lambda: w.parse_input(line))
w.after(interval, lambda: self.monitor(w, interval))
像这样使用:
app = <tkinter application class instance>
reader = Reader(port, baudrate, timeout=0)
reader.flushInput()
reader.monitor(app, 10)
app.mainloop()
在这种情况下,每当读取一行(由换行符分隔)时,它将在 app
实例上调用 parse_input
方法。
如果您决定使用线程,那么您需要一个队列来将数据传递给 Tkinter UI 线程并且必须确保您不从工作线程调用 Tkinter 方法。
我正在制作一个应用程序,用于读取来自 arduino 板上传感器的串行数据。尝试使用 matplotlib.animation class 制作所述数据的实时图表时会出现问题。当绘图正在进行时,GUI 小部件变得无响应。据我所知,在自己的线程上进行串行读取过程 运行 可能会解决这个问题。我无法理解如何制作它以便它与 FuncAnimation-subclass.
兼容def read_serial_data(port, bauds=9600):
s = serial.Serial(port, bauds)
line = s.readline()[0:-2]
return line
def getPorts():
return [port.device for port in serial.tools.list_ports.comports(include_links=False)]
class GUI():
def __init__(self):
self.root = Tk.Tk()
self._fig = plt.figure()
self.root.title('Measurement Dashboard')
self.root.state('normal')
self.root.config(background='#ffffff')
self._canvas = FigureCanvasTkAgg(self._fig, self.root)
self._canvas.get_tk_widget().grid(column = 1, row = 1)
self._canvas.draw()
self._animate = None
self._ax = self._fig.add_subplot(111)
self._ax.yaxis.grid(True, color = 'black', linestyle='--')
self._ax.xaxis.grid(True, color = 'black', linestyle='--')
self._ax.set_xlabel('time')
self._ax.set_ylabel('CO2')
self.filename = Tk.StringVar()
self.entry = ttk.Entry(self.root, textvariable = self.filename)
self.entry.grid(column = 2, row = 2)
self.info_var = Tk.StringVar()
self.info_entry = ttk.Entry(self.root, textvariable = self.info_var)
self.info_entry.grid(column = 2, row = 3)
self.port = Tk.StringVar()
self.ports = getPorts()
self._cb = ttk.Combobox(self.root, textvariable= self.port, values = self.ports)
self._cb.grid(column = 2, row = 1)
self.start_button = Tk.Button(self.root, text = 'Start', command = self.plot)
self.start_button.grid(column = 1, row = 2)
self.save_button = Tk.Button(self.root, text = 'Save info', command = self.save_info)
self.save_button.grid(column = 2, row = 4)
def save_info(self):
global info
info = self.info_var.get()
def start(self):
self.root.mainloop()
def plot(self):
if self._animate is None:
self.scope = Scope(self._ax, self.filename.get())
self._canvas.draw_idle()
self._animate = animation.FuncAnimation(self._fig, self.scope.animate, frames = self.update, interval=2000, blit=False)
def update(self):
line = read_serial_data(self.port.get())
data = line.decode('utf-8')
yield data
time = datetime.now()
duration = time - start_time
measurement = {'time': time, 'dur': duration.seconds, 'CO2': data, 'info': info}
write_csv_line(self.filename.get(), measurement)
self.root.after(10000, self.update)
if __name__ == "__main__":
gui = GUI()
gui.start()
thread = Thread(target=read_serial_data,args=(gui.port,))
thread.start()
您实际上并不需要另一个线程,但可以在串行端口上使用 non-blocking IO 并使用 Tkinter after
调用来管理轮询间隔。 pyserial provices inWaiting
测试设备是否有等待读取的输入。如果有字节等待,只读那些。
这里是一个示例 reader class 从串行设备读取行,并在读取完整行后 post 将它们发送到应用程序处理程序方法。
class Reader(Serial):
def __init__(self, *args, **kwargs):
self.buffer = ''
super(Reader, self).__init__(*args, **kwargs)
def monitor(self, w, interval=10):
n = self.inWaiting()
if n != 0:
data = self.read(n).decode('ascii')
self.buffer = self.buffer + data
ndx = self.buffer.find("\n")
if ndx != -1:
line = self.buffer[:ndx+1]
self.buffer = self.buffer[ndx+1:]
w.after_idle(lambda: w.parse_input(line))
w.after(interval, lambda: self.monitor(w, interval))
像这样使用:
app = <tkinter application class instance>
reader = Reader(port, baudrate, timeout=0)
reader.flushInput()
reader.monitor(app, 10)
app.mainloop()
在这种情况下,每当读取一行(由换行符分隔)时,它将在 app
实例上调用 parse_input
方法。
如果您决定使用线程,那么您需要一个队列来将数据传递给 Tkinter UI 线程并且必须确保您不从工作线程调用 Tkinter 方法。