Python / PySerial / Arduino 读取串行数据,然后将此确切的串行数据写入 txt 文件
Python / PySerial / Arduino Reading serial data and later writing this exact serial data to the txt file
关于这个案例我正在做一个新的 post 因为我在第一个案例中被误解了...
我有一个代码可以从 Arduino 读取串行数据,当在键盘上按下某些特定数字时,它会将这些数字写入 Arduino。 当我 运行 它 时,这段代码完美运行,它读取串行数据并且我能够将数据写入 Arduino。我使用 线程和 PySerial 库来实现这一点。
from pynput import keyboard
import threading
import serial
import sys
ser = None
class SerialReaderThread(threading.Thread):
def run(self):
global ser
ser = serial.Serial('COM3', baudrate = 9600, timeout = 5)
while True:
print(ser.readline().decode('utf-8'))
class KeyboardThread(threading.Thread):
def run(self):
def on_press(key):
try:
format(key.char)
if key.char == "1":
ser.write(b'1\r\n') #serial write - 1
elif key.char == "2":
ser.write(b'2\r\n') #serial write - 2
elif key.char == "3":
ser.write(b'3\r\n') #serial write - 3
elif key.char == "4":
ser.write(b'4\r\n') #serial write - 4
elif key.char == "5":
ser.write(b'5\r\n') #serial write - 5
elif key.char == "6":
ser.write(b'6\r\n') #serial write - 6
elif key.char == "0":
ser.write(b'0\r\n') #serial write - 0
except AttributeError:
format(key)
with keyboard.Listener(on_press=on_press) as listener:
listener.join()
listener = keyboard.Listener(on_press=on_press)
listener.start()
serial_thread = SerialReaderThread()
keyboard_thread = KeyboardThread()
serial_thread.start()
keyboard_thread.start()
serial_thread.join()
keyboard_thread.join()
在此之后 我想到我也可以将这个串行数据准确地写入到 windows 上的 .txt 文件中。所以我创建了一个名为 FileWriting 的新线程,并决定将 ser.readline().decode('utf-8') 写入它,但是它不再起作用了...这是我为写入 .txt 文件而编写的新修改代码。
from pynput import keyboard
import threading
import serial
import sys
import io
ser = None
class SerialReaderThread(threading.Thread):
def run(self):
global ser
ser = serial.Serial('COM3', baudrate = 9600, timeout = 5)
while True:
print(ser.readline().decode('utf-8'))
class FileWriting(threading.Thread):
def run(self):
while True:
with io.open("output.txt", "a", encoding="utf-8") as f:
f.write(ser.readline().decode('utf-8'))
class KeyboardThread(threading.Thread):
def run(self):
def on_press(key):
try:
format(key.char)
if key.char == "1":
ser.write(b'1\r\n') #serial write - 1
elif key.char == "2":
ser.write(b'2\r\n') #serial write - 2
elif key.char == "3":
ser.write(b'3\r\n') #serial write - 3
elif key.char == "4":
ser.write(b'4\r\n') #serial write - 4
elif key.char == "5":
ser.write(b'5\r\n') #serial write - 5
elif key.char == "6":
ser.write(b'6\r\n') #serial write - 6
elif key.char == "0":
ser.write(b'0\r\n') #serial write - 0
except AttributeError:
format(key)
with keyboard.Listener(on_press=on_press) as listener:
listener.join()
listener = keyboard.Listener(on_press=on_press)
listener.start()
serial_thread = SerialReaderThread()
keyboard_thread = KeyboardThread()
file_thread = FileWriting()
serial_thread.start()
keyboard_thread.start()
file_thread.start()
serial_thread.join()
keyboard_thread.join()
file_thread.join()
很明显,我只添加了一个名为 file_thread 的新线程,现在我 运行 串行数据的代码打印以及向 Arduino 写入数据时工作正常,但是,代码没有向 .txt 文件写入任何内容并给我一个错误:
Exception in thread Thread-3:
Traceback (most recent call last):
File "C:\Python\lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "C:\Users\ultra\Desktop\work\menucode.py", line 32, in run
f.write(ser.readline().decode('utf-8'))
AttributeError: 'NoneType' object has no attribute 'readline'
如果有人在读取串行数据和写入文本文件时遇到与 Arduino 类似的问题,或者如果有人知道如何解决这个问题,请告诉我我现在非常绝望和一切不胜感激。
在文件的顶部,您声明 ser = None
。您收到的错误消息表明 ser
对象在 FileWriting
线程尝试访问它之前尚未设置为 Serial
对象。
解决此问题的快速方法是
ser = serial.Serial('COM3', baudrate = 9600, timeout = 5)
在启动任何线程之前。
但是,这可能会使程序行为异常,因为您在不同的线程中有两个相互竞争的 ser.readline()
调用。这可能会导致大约一半的 Arduino 输出数据被每个线程捕获(取决于 pyserial 如何处理对同一资源的多个请求)。为避免此问题,我建议让单个线程与 ser
对象接口,并让该线程使用 queue.
将数据传递给其他线程
这个数据交换如何进行的简单示例:
import queue
import serial
q = queue.Queue()
ser = serial.Serial('COM3', baudrate = 9600, timeout = 5)
class SerialReaderThread(threading.Thread):
def run(self):
while True:
# Read output from ser
output = ser.readline().decode('utf-8')
print(output)
# Add output to queue
q.put(output)
class FileWriting(threading.Thread):
def run(self):
while True:
output = q.get() # This will wait until an item is available in the queue
with open("output.txt", "a+") as f:
f.write(output)
f.write("\n") # If you want outputs separated by newlines
关于这个案例我正在做一个新的 post 因为我在第一个案例中被误解了...
我有一个代码可以从 Arduino 读取串行数据,当在键盘上按下某些特定数字时,它会将这些数字写入 Arduino。 当我 运行 它 时,这段代码完美运行,它读取串行数据并且我能够将数据写入 Arduino。我使用 线程和 PySerial 库来实现这一点。
from pynput import keyboard
import threading
import serial
import sys
ser = None
class SerialReaderThread(threading.Thread):
def run(self):
global ser
ser = serial.Serial('COM3', baudrate = 9600, timeout = 5)
while True:
print(ser.readline().decode('utf-8'))
class KeyboardThread(threading.Thread):
def run(self):
def on_press(key):
try:
format(key.char)
if key.char == "1":
ser.write(b'1\r\n') #serial write - 1
elif key.char == "2":
ser.write(b'2\r\n') #serial write - 2
elif key.char == "3":
ser.write(b'3\r\n') #serial write - 3
elif key.char == "4":
ser.write(b'4\r\n') #serial write - 4
elif key.char == "5":
ser.write(b'5\r\n') #serial write - 5
elif key.char == "6":
ser.write(b'6\r\n') #serial write - 6
elif key.char == "0":
ser.write(b'0\r\n') #serial write - 0
except AttributeError:
format(key)
with keyboard.Listener(on_press=on_press) as listener:
listener.join()
listener = keyboard.Listener(on_press=on_press)
listener.start()
serial_thread = SerialReaderThread()
keyboard_thread = KeyboardThread()
serial_thread.start()
keyboard_thread.start()
serial_thread.join()
keyboard_thread.join()
在此之后 我想到我也可以将这个串行数据准确地写入到 windows 上的 .txt 文件中。所以我创建了一个名为 FileWriting 的新线程,并决定将 ser.readline().decode('utf-8') 写入它,但是它不再起作用了...这是我为写入 .txt 文件而编写的新修改代码。
from pynput import keyboard
import threading
import serial
import sys
import io
ser = None
class SerialReaderThread(threading.Thread):
def run(self):
global ser
ser = serial.Serial('COM3', baudrate = 9600, timeout = 5)
while True:
print(ser.readline().decode('utf-8'))
class FileWriting(threading.Thread):
def run(self):
while True:
with io.open("output.txt", "a", encoding="utf-8") as f:
f.write(ser.readline().decode('utf-8'))
class KeyboardThread(threading.Thread):
def run(self):
def on_press(key):
try:
format(key.char)
if key.char == "1":
ser.write(b'1\r\n') #serial write - 1
elif key.char == "2":
ser.write(b'2\r\n') #serial write - 2
elif key.char == "3":
ser.write(b'3\r\n') #serial write - 3
elif key.char == "4":
ser.write(b'4\r\n') #serial write - 4
elif key.char == "5":
ser.write(b'5\r\n') #serial write - 5
elif key.char == "6":
ser.write(b'6\r\n') #serial write - 6
elif key.char == "0":
ser.write(b'0\r\n') #serial write - 0
except AttributeError:
format(key)
with keyboard.Listener(on_press=on_press) as listener:
listener.join()
listener = keyboard.Listener(on_press=on_press)
listener.start()
serial_thread = SerialReaderThread()
keyboard_thread = KeyboardThread()
file_thread = FileWriting()
serial_thread.start()
keyboard_thread.start()
file_thread.start()
serial_thread.join()
keyboard_thread.join()
file_thread.join()
很明显,我只添加了一个名为 file_thread 的新线程,现在我 运行 串行数据的代码打印以及向 Arduino 写入数据时工作正常,但是,代码没有向 .txt 文件写入任何内容并给我一个错误:
Exception in thread Thread-3:
Traceback (most recent call last):
File "C:\Python\lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "C:\Users\ultra\Desktop\work\menucode.py", line 32, in run
f.write(ser.readline().decode('utf-8'))
AttributeError: 'NoneType' object has no attribute 'readline'
如果有人在读取串行数据和写入文本文件时遇到与 Arduino 类似的问题,或者如果有人知道如何解决这个问题,请告诉我我现在非常绝望和一切不胜感激。
在文件的顶部,您声明 ser = None
。您收到的错误消息表明 ser
对象在 FileWriting
线程尝试访问它之前尚未设置为 Serial
对象。
解决此问题的快速方法是
ser = serial.Serial('COM3', baudrate = 9600, timeout = 5)
在启动任何线程之前。
但是,这可能会使程序行为异常,因为您在不同的线程中有两个相互竞争的 ser.readline()
调用。这可能会导致大约一半的 Arduino 输出数据被每个线程捕获(取决于 pyserial 如何处理对同一资源的多个请求)。为避免此问题,我建议让单个线程与 ser
对象接口,并让该线程使用 queue.
这个数据交换如何进行的简单示例:
import queue
import serial
q = queue.Queue()
ser = serial.Serial('COM3', baudrate = 9600, timeout = 5)
class SerialReaderThread(threading.Thread):
def run(self):
while True:
# Read output from ser
output = ser.readline().decode('utf-8')
print(output)
# Add output to queue
q.put(output)
class FileWriting(threading.Thread):
def run(self):
while True:
output = q.get() # This will wait until an item is available in the queue
with open("output.txt", "a+") as f:
f.write(output)
f.write("\n") # If you want outputs separated by newlines