没有 errors/bugs 无法读取串行数据

Unable to read serial data without errors/bugs

我正在编写一个基于 tkinter 的小应用程序,以便从我的 arduino 读取串行数据。 arduino,当它收到串行文本(rf)时,它会开始向电脑发送数据。

以下是可疑代码:

def readSerial():
    ser_bytes = ser.readline()
    ser_bytes = ser_bytes.decode("utf-8")
    text.insert("end", ser_bytes)
    after_id=root.after(100,readSerial)
    #root.after(100,readSerial)


def measure_all():    
   global stop_
   stop_ = False
   ser.write("rf".encode()) #Send string 'rf to arduino', which means Measure all Sensors
   readSerial() #Start Reading data

现在这行不通了。程序死机,终端上没有显示任何信息。

当我将行 after_id=root.after(100,readSerial) 更改为 root.after(100,readSerial) 时,程序将运行,但仅当我接收到串行输入时。

例如,如果arduino发送串口有5秒的延迟,那么程序就会卡住,直到它接收到数据。更具体地说,如果程序被最小化,并且我 select 正常查看它,它不会响应,除非它收到来自 arduino 的输入(将正常显示)。

所以即使是现在,它仍然无法正常工作。

但也要记住,我需要有 after_id 行,这样我才能有一个句柄,这样我就可以终止 readSerial() 函数(例如,当用户按下 'stop measurement' 按钮)。

有人能理解发生了什么吗,以及我如何才能拥有 after_id 行为(这样我可以稍后停止连续功能),同时让程序正常运行,而不会崩溃或卡住,直到它收到数据?

编辑:这是根据用户的 acw1668 建议修改后的代码。这是行不通的。我在 tkinter 的文本框架上什么也看不到。

import tkinter as tk
import tkinter.ttk as ttk
import serial.tools.list_ports #for a list of all the COM ports
from tkinter import scrolledtext
import threading
import time
from queue import SimpleQueue


#to be used on our canvas
HEIGHT = 800
WIDTH = 800

#hardcoded baud rate
baudRate = 9600

# this is the global variable that will hold the serial object value
ser = None #initial  value. will change at 'on_select()'

after_id = None

#this is the global variable that will hold the value from the dropdown for the sensor select
dropdown_value = None

# create the queue for holding serial data
queue = SimpleQueue()
# flag use to start/stop thread tasks
stop_flag = None


# --- functions ---

#the following two functtions are for the seria port selection, on frame 1

#this function populates the combobox on frame1, with all the serial ports of the system
def serial_ports():
    return serial.tools.list_ports.comports()


#when the user selects one serial port from the combobox, this function will execute
def on_select(event=None):
    global ser
    COMPort = cb.get()
    string_separator = "-"
    COMPort = COMPort.split(string_separator, 1)[0] #remove everything after '-' character
    COMPort = COMPort[:-1] #remove last character of the string (which is a space)
    ser = serial.Serial(port = COMPort, baudrate=9600, timeout=0.1)
    #readSerial() #start reading shit. DELETE. later to be placed in a button
    # get selection from event    
    #print("event.widget:", event.widget.get())
    # or get selection directly from combobox
    #print("comboboxes: ", cb.get())

    #ser = Serial(serialPort , baudRate, timeout=0, writeTimeout=0) #ensure non-blocking


def readSerial(queue):
    global stop_flag
    if stop_flag:
        print("Reading task is already running")
    else:
        print("started")
        stop_flag = threading.Event()
        while not stop_flag.is_set():
            if ser.in_waiting:
                try:
                  ser_bytes = ser.readline()
                  data = ser_bytes.decode("utf-8")
                  queue.put(data)
                except UnicodeExceptionError:
                  print("Unicode Error")
            else:
                time.sleep(0.1)
        print("stopped")
        stop_flag = None




# function to monitor whether data is in the queue
# if there is data, get it and insert into the text box
def data_monitor(queue):
    if not queue.empty():
       text.insert("end", f"{queue.get()}\n")
       if vsb.get()[1]==1.0: #if the scrollbar is down to the bottom, then autoscroll
          text.see("end")
    root.after(100, data_monitor, queue)



    
   


# this function is triggered, when a value is selected from the dropdown
def dropdown_selection(*args):    
   global dropdown_value
   dropdown_value = clicked.get()
   button_single['state'] = 'normal' #when a selection from the dropdown happens, change the state of the 'Measure This Sensor' button to normal


# this function is triggered, when button 'Measure all Sensors' is pressed, on frame 2
def measure_all():    
   button_stop['state']='normal' #make the 'Stop Measurement' button accessible
   ser.write("rf".encode()) #Send string 'rf to arduino', which means Measure all Sensors
   sleep(0.05) # 50 milliseconds
   threading.Thread(target=readSerial, args=(queue,)).start()



# this function is triggered, when button 'Measure this Sensor' is pressed, on frame 2
def measure_single():    
   global stop_
   stop_=False
   button_stop['state']='normal'
   ser.write(dropdown_value.encode()) #Send string 'rf to arduino', which means Measure all Sensors!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
   readSerial()


# this function is triggered, when button 'STOP measurement(s)' is pressed, on frame 2
def stop_measurement():    
    button_stop['state']='disabled'
    ser.write("c".encode())
    if stop_flag:
        stop_flag.set()
    else:
        print("Reading task is not running")
# --- functions ---



# --- main ---
root = tk.Tk() #here we create our tkinter window
root.title("Sensor Interface")

#we use canvas as a placeholder, to get our initial screen size (we have defined HEIGHT and WIDTH)
canvas = tk.Canvas(root, height=HEIGHT, width=WIDTH)
canvas.pack()

#we use frames to organize all the widgets in the screen

'''
relheight, relwidth − Height and width as a float between 0.0 and 1.0, as a fraction of the height and width of the parent widget.
relx, rely − Horizontal and vertical offset as a float between 0.0 and 1.0, as a fraction of the height and width of the parent widget.
'''

# --- frame 1 ---
frame1 = tk.Frame(root)
frame1.place(relx=0, rely=0.05, relheight=0.03, relwidth=1, anchor='nw') #we use relheight and relwidth to fill whatever the parent is - in this case- root

label0 = tk.Label(frame1, text="Select the COM port that the device is plugged in: ")
label0.config(font=("TkDefaultFont", 8))
label0.place(relx = 0.1, rely=0.3, relwidth=0.3, relheight=0.5)


cb = ttk.Combobox(frame1, values=serial_ports())
cb.place(relx=0.5, rely=0.5, anchor='center')
# assign function to combobox, that will run when an item is selected from the dropdown
cb.bind('<<ComboboxSelected>>', on_select)
# --- frame 1 ---



# --- frame 2 ---
frame2 = tk.Frame(root, bd=5) #REMOVED THIS bg='#80c1ff' (i used it to see the borders of the frame)
frame2.place(relx=0, rely=0.1, relheight=0.07, relwidth=1, anchor='nw')

#Button for 'Measure All Sensors'
#it will be enabled initially
button_all = tk.Button(frame2, text="Measure all Sensors", bg='#80c1ff', fg='red', state='normal', command=measure_all)  #bg='gray'
button_all.place(relx=0.2, rely=0.5, anchor='center')

#label
label1 = tk.Label(frame2, text="OR, select a single sensor to measure: ")
label1.config(font=("TkDefaultFont", 9))
label1.place(relx = 0.32, rely=0.3, relwidth=0.3, relheight=0.4)

#dropdown
#OPTIONS = [0,1,2,3,4,5,6,7]
OPTIONS = list(range(8)) #[0,1,2,3,4,5,6,7]
clicked = tk.StringVar(master=frame2) # Always pass the `master` keyword argument, in order to run the function when we select from the dropdown
clicked.set(OPTIONS[0]) # default value
clicked.trace("w", dropdown_selection) #When a value from the dropdown is selected, function dropdown_selection() is executed
drop = tk.OptionMenu(frame2, clicked, *OPTIONS)
drop.place(relx = 0.65, rely=0.25, relwidth=0.08, relheight=0.6)

#Button for 'Measure Single Sensor'
#this will be disabled initially, and will be enabled when an item from the dropdown is selected
button_single = tk.Button(frame2, text="Measure this Sensor", bg='#80c1ff', fg='red', state='disabled', command=measure_single) #bg='gray'
button_single.place(relx = 0.85, rely=0.5, anchor='center')
# --- frame 2 ---


# --- frame 3 ---
frame3 = tk.Frame(root, bd=5) #REMOVED THIS bg='#80c1ff' (i used it to see the borders of the frame)
frame3.place(relx=0, rely=0.2, relheight=0.07, relwidth=1, anchor='nw')

#Button for 'STOP Measurement(s)'
#this will be disabled initially, and will be enabled only when a measurement is ongoing
button_stop = tk.Button(frame3, text="STOP measurement(s)", bg='#80c1ff', fg='red', state='disabled', command=stop_measurement)
button_stop.place(relx=0.5, rely=0.5, anchor='center')
# --- frame 3 ---



# --- frame 4 ---
frame4 = tk.Frame(root, bd=5)
frame4.place(relx=0, rely=0.3, relheight=0.09, relwidth=1, anchor='nw')

label2 = tk.Label(frame4, text="Select a sensor to plot data: ")
label2.place(relx = 0.1, rely=0.3, relwidth=0.3, relheight=0.5)

clickedForPlotting = tk.StringVar()
clickedForPlotting.set(OPTIONS[0]) # default value
dropPlot = tk.OptionMenu(frame4, clickedForPlotting, *OPTIONS)
dropPlot.place(relx=0.5, rely=0.5, anchor='center')

#CHANGE LATER
#dropDownButton = tk.Button(frame4, text="Plot sensor data", bg='#80c1ff', fg='red', command=single_Sensor) #bg='gray'
#dropDownButton.place(relx = 0.85, rely=0.5, anchor='center')
# --- frame 4 ---


#frame 5 will be the save to txt file


#frame 6 will be the area with the text field
# --- frame 6 ---
frame6 = tk.Frame(root, bg='#80c1ff') #remove color later
frame6.place(relx=0.0, rely=0.4, relheight=1, relwidth=1, anchor='nw')

text_frame=tk.Frame(frame6)
text_frame.place(relx=0, rely=0, relheight=0.6, relwidth=1, anchor='nw')
text=tk.Text(text_frame)
text.place(relx=0, rely=0, relheight=1, relwidth=1, anchor='nw')
vsb=tk.Scrollbar(text_frame)
vsb.pack(side='right',fill='y')
text.config(yscrollcommand=vsb.set)
vsb.config(command=text.yview)
# --- frame 6 ---


# start data monitor task
data_monitor(queue)



root.mainloop() #here we run our app
# --- main ---

为了不阻塞主tkinter应用,建议使用thread来运行串口读取。同样使用 queue.SimpleQueue 将串行数据传输到主任务,以便可以将串行数据插入到 Text 小部件中。

下面是一个例子:

import threading
import time
from queue import SimpleQueue
import tkinter as tk
import serial

class SerialReader(threading.Thread):
    def __init__(self, ser, queue, *args, **kw):
        super().__init__(*args, **kw)
        self.ser = ser
        self.queue = queue
        self._stop_flag = threading.Event()

    def run(self):
        print("started")
        while not self._stop_flag.is_set():
            if self.ser.in_waiting:
                ser_bytes = self.ser.readline()
                data = ser_bytes.decode("utf-8")
                self.queue.put(data)
            else:
                time.sleep(0.1)
        print("stopped")

    def terminate(self):
        self._stop_flag.set()

# create the serial instance
ser = serial.Serial(port="COM1") # provide other parameters as well
# create the queue for holding serial data
queue = SimpleQueue()
# the serial reader task
reader = None

def start_reader():
    global reader
    if reader is None:
        # create the serial reader task
        reader = SerialReader(ser, queue, daemon=True)
    if not reader.is_alive():
        # start the serial reader task
        reader.start()
    else:
        print("Reader is already running")

def stop_reader():
    global reader
    if reader and reader.is_alive():
        # stop the serial reader task
        reader.terminate()
        reader = None
    else:
        print("Reader is not running")

# function to monitor whether data is in the queue
# if there is data, get it and insert into the text box
def data_monitor(queue):
    if not queue.empty():
        text.insert("end", f"{queue.get()}\n")
    root.after(100, data_monitor, queue)

root = tk.Tk()

text = tk.Text(root, width=80, height=20)
text.pack()

frame = tk.Frame(root)
tk.Button(frame, text="Start", command=start_reader).pack(side="left")
tk.Button(frame, text="Stop", command=stop_reader).pack(side="left")
frame.pack()

# start data monitor task
data_monitor(queue)

root.mainloop()

Update@2021-04-16:不使用 class

的示例
import threading
import time
from queue import SimpleQueue
import tkinter as tk
import serial

# create the serial instance
ser = serial.Serial(port="COM1") # provide other parameters as well
# create the queue for holding serial data
queue = SimpleQueue()
# flag use to start/stop thread tasks
stop_flag = None

def readSerial(queue):
    global stop_flag
    if stop_flag:
        print("Reading task is already running")
    else:
        print("started")
        stop_flag = threading.Event()
        while not stop_flag.is_set():
            if ser.in_waiting:
                ser_bytes = ser.readline()
                data = ser_bytes.decode("utf-8")
                queue.put(data)
            else:
                time.sleep(0.1)
        print("stopped")
        stop_flag = None

def start_reader():
    threading.Thread(target=readSerial, args=(queue,)).start()

def stop_reader():
    if stop_flag:
        stop_flag.set()
    else:
        print("Reading task is not running")

# function to monitor whether data is in the queue
# if there is data, get it and insert into the text box
def data_monitor(queue):
    if not queue.empty():
        text.insert("end", f"{queue.get()}\n")
    root.after(100, data_monitor, queue)

root = tk.Tk()

text = tk.Text(root, width=80, height=20)
text.pack()

frame = tk.Frame(root)
tk.Button(frame, text="Start", command=start_reader).pack(side="left")
tk.Button(frame, text="Stop", command=stop_reader).pack(side="left")
frame.pack()

# start data monitor task
data_monitor(queue)

root.mainloop()