线程读取 Python 中的串行端口(使用 GUI)

Threading reading a serial port in Python (with a GUI)

我想在运行正在使用 GUI 时,只要有数据要从串行端口读取就触发一个事件。 pySerial 模块显然具有实验性功能,但没有特别详细的记录(我找不到任何有用的示例 in the API)。

This question 似乎处理相同或至少非常相似的任务,但未提供复制它的说明或工作代码示例。

我想出了这个代码:

import tkinter as tk
import serial
import threading

# Create GUI window
window = tk.Tk()

# Initialize the port
myPort = serial.Serial('/dev/ttyUSB0')

# Function to call whenever there is data to be read
def readFunc(port):
    port.readline()
    print('Line read')

# Configure threading
t1 = threading.Thread(target = readFunc, args=[myPort])
t1.start()

# Main loop of the window
window.mainloop()

运行它确实触发了事件,但只有一次。这是为什么?有没有一种“推荐”的方法可以通过使用 pySerial 本身的功能来做到这一点?

或者,我也会 运行 读取和处理事件数据的函数,就像使用 GUI 元素一样。如果那是更好的解决方案,那将如何完成?

Related question(未回答),可能使这个问题重复

编辑:这是从下面的答案中得出的一个最小示例,它会在读取传入数据时更改标签的文本:

import tkinter as tk

from serial import Serial
from serial.threaded import ReaderThread, Protocol

app = tk.Tk()
label = tk.Label(text="A Label")
label.pack()

class SerialReaderProtocolRaw(Protocol):
    port = None

    def connection_made(self, transport):
        """Called when reader thread is started"""
        print("Connected, ready to receive data...")

    def data_received(self, data):
        """Called with snippets received from the serial port"""
        updateLabelData(data)

def updateLabelData(data):
    data = data.decode("utf-8")
    label['text']=data
    app.update_idletasks()

# Initiate serial port
serial_port = Serial("/dev/ttyACM0")

# Initiate ReaderThread
reader = ReaderThread(serial_port, SerialReaderProtocolRaw)
# Start reader
reader.start()

app.mainloop()

当您从另一个 运行 线程更新 GUI 时,您主要关心的是线程安全。
为此,我们可以使用 .after() 方法,该方法为任何给定的 tk 小部件执行回调。

您要求的另一部分是使用线程串行reader。
这可以通过使用 ReaderThread 和协议来实现。

您可以选择两个协议:

这是工作代码示例,上面提到了两种协议,因此您可以选择适合您的一种。请记住,所有来自串行端口的数据都只是原始字节。

import tkinter as tk

from serial import Serial
from serial.threaded import ReaderThread, Protocol, LineReader


class SerialReaderProtocolRaw(Protocol):
    tk_listener = None

    def connection_made(self, transport):
        """Called when reader thread is started"""
        if self.tk_listener is None:
            raise Exception("tk_listener must be set before connecting to the socket!")
        print("Connected, ready to receive data...")

    def data_received(self, data):
        """Called with snippets received from the serial port"""
        self.tk_listener.after(0, self.tk_listener.on_data, data.decode())


class SerialReaderProtocolLine(LineReader):
    tk_listener = None
    TERMINATOR = b'\n\r'

    def connection_made(self, transport):
        """Called when reader thread is started"""
        if self.tk_listener is None:
            raise Exception("tk_listener must be set before connecting to the socket!")
        super().connection_made(transport)
        print("Connected, ready to receive data...")

    def handle_line(self, line):
        """New line waiting to be processed"""
        # Execute our callback in tk
        self.tk_listener.after(0, self.tk_listener.on_data, line)


class MainFrame(tk.Frame):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.listbox = tk.Listbox(self)
        self.listbox.pack()
        self.pack()

    def on_data(self, data):
        print("Called from tk Thread:", data)
        self.listbox.insert(tk.END, data)


if __name__ == '__main__':
    app = tk.Tk()

    main_frame = MainFrame()
    # Set listener to our reader
    SerialReaderProtocolLine.tk_listener = main_frame
    # Initiate serial port
    serial_port = Serial("/dev/ttyUSB0")
    # Initiate ReaderThread
    reader = ReaderThread(serial_port, SerialReaderProtocolLine)
    # Start reader
    reader.start()

    app.mainloop()