使用 asyncio 读取串行端口的输出

Using asyncio to read the output of a serial port

我一直在为工作做一些端口读取,我需要知道我是否应该使用 asyncIO 来读取从这个端口进来的数据。

以下是有关系统构建方式的一些详细信息。

  1. 同时读取多个传感器,并可能产生同时输入的输出。 (所有数据通过连接到 USB 端口的探测器 ZU10 进入)所有数据一进入就必须加上时间戳。

  2. 数据应该被处理,然后通过 REST API.

  3. 发送到 django webapp

问题是,不丢失任何数据真的很重要,我问这个是因为我相信一定有比我想的更简单的方法来做到这一点。

我希望数据传入的方式是通过一个异步进程将数据放入队列并为其添加时间戳,这样就不会出现数据丢失的情况,并且可能没有时间戳超过几分之一秒,这不是问题。

如果有人有任何想法,我将不胜感激

这是我用来打开端口、接收数据的代码,以及到目前为止我在实际读取有意义的数据时得到的结果。

阅读部分如下:

import serial #for port opening
import sys #for exceptions

#

#configure the serial connections (the parameters differs on the device you are connecting to)

class Serializer: 
    def __init__(self, port, baudrate=9600, timeout=.1): 
        self.port = serial.Serial(port = port, baudrate=baudrate, 
        timeout=timeout, writeTimeout=timeout)

    def open(self): 
        ''' Abre Puerto Serial'''
        self.port.open()

    def close(self): 
        ''' Cierra Puerto Serial'''
        self.port.close() 

    def send(self, msg):
        ''' envía mensaje a dispositivo serial'''
        self.port.write(msg)

    def recv(self):
        ''' lee salidas del dispositivo serial '''
        return self.port.readline()

PORT = '/dev/ttyUSB0' #Esto puede necesitar cambiarse


# test main class made for testing
def main():
    test_port = Serializer(port = PORT)

    while True:
        print(test_port.recv())

if __name__ == "__main__":
    main()

还有一些我将用来过滤掉有意义的阅读的东西(忍受它,它可能充满了可怕的错误,也许是一个可怕的正则表达式):

import re
from Lector import ChecaPuertos
from Lector import entrada


patterns = [r'^{5}[0-9],2[0-9a-fA-F] $'] #pattern list

class IterPat:
    def __init__(self, lect, pat = patterns):
        self.pat = pat  # lista de patrones posibles para sensores
        self.lect = lect  # lectura siendo analizada
        self.patLen = len(pat)  #Largo de patrones

    def __iter__(self):
        self.iteracion = 0 #crea la variable a iterar.

    def __next__(self):
        '''
        Primero revisa si ya pasamos por todas las iteraciones posibles
        luego revisa si la iteración es la que pensabamos, de ser así regresa una
        tupla con el patrón correspondiente, y la lectura
        de otra forma para el valor de ser mostrado
        '''
        pattern = re.compile(self.pat[self.iteracion])
        comp = pattern.match(self.lect)
        if comp == True:
            re_value = (self.pattern, self.lect)
            return re_value
        else:
            self.iteración += 1

def main():
    puerto = ChecaPuertos.serial_ports()
    serial = entrada.Serializer(port = puerto[0])

    if serial != open:
        serial.open()
    while True:
        iter = IterPat()

     #This is incomplete right here.

我正在使用 asyncio read/write 带有 pyserial 的串口。我让串行连接另一端的设备在准备好接收有效载荷时写入一个字节。 Asyncio 监视该字节然后发送有效负载。它看起来像这样:

serial_f = serial.Serial(port=dev, baudrate=BAUDRATE, timeout=2)

def write_serial():
    status = serial_f.read(1)
    serial_f.write(buffer)

loop = asyncio.get_event_loop()
loop.add_reader(serial_f.fileno(), write_serial)