如何从连续数据源更新 matplotlib?

how to make matplotlib update from continuous data source?

我正在尝试根据从连续实时数据源收集的数据制作一个简单的图表。

我使用matplotlib的代码如下:

import matplotlib.pyplot as plt
import matplotlib.animation as animation
import time
from serialdata import SerialData 

fig = plt.figure()
ax1 = fig.add_subplot(1,1,1)

def animate(i):
    xar = []
    yar = []

    #Open Serial Port and Receive Continuous Data 
    #in format of number,number
    a = SerialData()
    b = a.setSerial('COM3', 9600)
    dataArray = a.getSerial(9999)

    for eachLine in dataArray:
        if len(eachLine)>1:
            x,y = eachLine.split(',')
            xar.append(int(x))
            yar.append(int(y))
    ax1.clear()
    ax1.plot(xar,yar)
ani = animation.FuncAnimation(fig, animate, interval=1000)
plt.show()

serialdata.py每次从数据源获取数据时只生成数据:

import serial
from time import sleep

class SerialData:
    def __init__(self):
        pass        

    def setSerial(self, port, baudrate):
        self.port = port
        self.baudrate = baudrate
        print("Opening Serial Port...")
        self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
        sleep(2)
        print("Setup Successful") 

    def getSerial(self, read):
        while True:
            self.data = self.ser.read(read)
            if len(self.data)>0:
                yield self.data.decode('utf-8')
            sleep(.1)
        self.ser.close() 

应该以以下形式发送数据:

1,2
2,5
3,7
4(autoincrement),5(random number)

当我让它们在 CLI 上打印时工作正常。 但是我不能让它与 matplotlib 一起工作。

没有具体错误。

它只是显示

Opening Serial Port...
Setup Successful

然后...就是这样。然后什么也没有发生。 我的代码有什么问题?


我做了更多研究,发现我不应该使用 show() 所以我重写了我的代码如下:

import time
import numpy as np
import matplotlib.pyplot as plt
from serialdata import SerialData   

plt.axis([0, 1000, 0, 1])
plt.ion()
plt.show()

for i in range(1000):
    # y = np.random.random()

    a = SerialData()
    b = a.setSerial('COM3', 9600)
    dataArray = a.getSerial(9999)

    print("Data Gathering...")
    for eachLine in dataArray:

        if len(eachLine)>1:
            y = eachLine.split(',')[1]

            plt.scatter(i, y)
            plt.draw()
            time.sleep(0.05)

但是,结果是一样的

我没有串口,但这是我试图解决的问题:

import matplotlib.pyplot as plt
import matplotlib.animation as animation
import time

import random
from time import sleep

class MockData():
    def __init__(self, n):
        self.n = n  #number of data points to return

    def getData(self):
        start = 0 #mock autoincrement
        for i in range(self.n):
            yield (start, random.randint(0, 100)) #yield a tuple, not a str (x(autoincrem),y(random))
            start+=1  


fig = plt.figure()
ax1 = fig.add_subplot(1,1,1)

def animate(i):
    xar = []
    yar = []

    #Open Serial Port and Receive Continuous Data 
    #in format of number,number
    a = MockData(10)
    dataArray = a.getData() #this is a generator that will yield (x,y) tuples 10 times. 

    for eachLine in dataArray:
            x,y = eachLine
            xar.append(int(x))
            yar.append(int(y))
    ax1.clear()
    ax1.plot(xar,yar)

ani = animation.FuncAnimation(fig, animate, interval=1000)
plt.show()

此代码 fail\be 非常慢:

  1. 如果我一次请求大量数据:a=MockData(1000) 每帧执行约 2-3 秒
  2. 如果读出时间很长,即

    def getData(self):
        start = 0
        for i in range(self.n):
            yield (start, random.randint(0, 100))
            start+=1
            time.sleep(1)
    

    这将每帧执行约 10 秒

  3. 两者都

据我所知,问题出在您的 SerialData class 中,特别是 getSerial 方法中。我知道这一点,因为这是负责检索实际数据的方法。由于我没有串口,我无法准确地测试在哪里,但我可以打赌一些猜测。

   def getSerial(self, read):
        while True:
            self.data = self.ser.read(read)
            if len(self.data)>0:
                yield self.data.decode('utf-8')
            sleep(.1)
        self.ser.close()
  1. self.data = self.ser.read(read)这里的问题是你request 9999 bytes to be read。 9600 波特约为 1200 bytes/s。要读取 9 999 个字节,大约需要 10 秒。那就是 if 实际上有 9999 个新字节要读取。如果没有 read 函数将继续等待,直到它读入该数量。这等于我的 1) 测试用例,只是等待时间为 sleep(10)。因此 if 检查已经在 read 函数中,因此您不必再次检查。
  2. self.data.decode('utf-8')看看解码9999字节需要多长时间:

    >>> from timeit import Timer
    >>> ts = Timer("s.decode('utf-8')", "s = b'1'*9999")
    >>> ts.timeit()
    2.524627058740407
    

    现在,当然,这与您的转换不同,但由于我的膝上型电脑上没有串行端口,因此无法对其进行测试。总之,好像很慢。现在,你有我的情况 2) sleep(12)

  3. sleep(.1)现在看来简直是雪上加霜

您的代码没有报告错误,因为它有效,只需要 3 分钟多的时间来读取第一组数据并绘制它。

我的建议是你忽略这种方法,读入数据,几乎是字面意思,一个字节一个字节,在它们出现时绘制它们。有一个数组,您只需在新字节到来时附加新字节并绘制它。你可以很容易地拉出类似的东西:

serialPort = serial.Serial(self.port, self.baudrate, timeout=1)
data = list() #some global list to append to

def animate():
    d = serialPort.read(2) #this will hang until it completes
    data.append(d) #does the data really have to be in string? 
    ax.plot(data)

ani = animation.FuncAnimation(fig, animate, interval=1000) #optionally send in repeat_delay?
plt.show()

尽管部分原因是我不确定如果数据数组开始变得非常大,动画会如何表现,因此您可以考虑将 x 轴向右移动,删除旧数据并定期创建一个新数组.

希望我能帮助您指出需要测试的内容。开始从串行端口对所有读入进行计时,以了解需要多长时间。不要忘记确保您实际上正在从串行端口读取 任何内容,如果您没有读取,serial.read 将在那里等待。

我也从来没有做过这样的事情,所以我可能偏离了正轨。上次和串口打交道是elementary(rofl)的robotics championship