如何从连续数据源更新 matplotlib?
how to make matplotlib update from continuous data source?
我正在尝试根据从连续实时数据源收集的数据制作一个简单的图表。
我使用matplotlib的代码如下:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import time
from serialdata import SerialData
fig = plt.figure()
ax1 = fig.add_subplot(1,1,1)
def animate(i):
xar = []
yar = []
#Open Serial Port and Receive Continuous Data
#in format of number,number
a = SerialData()
b = a.setSerial('COM3', 9600)
dataArray = a.getSerial(9999)
for eachLine in dataArray:
if len(eachLine)>1:
x,y = eachLine.split(',')
xar.append(int(x))
yar.append(int(y))
ax1.clear()
ax1.plot(xar,yar)
ani = animation.FuncAnimation(fig, animate, interval=1000)
plt.show()
serialdata.py每次从数据源获取数据时只生成数据:
import serial
from time import sleep
class SerialData:
def __init__(self):
pass
def setSerial(self, port, baudrate):
self.port = port
self.baudrate = baudrate
print("Opening Serial Port...")
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
sleep(2)
print("Setup Successful")
def getSerial(self, read):
while True:
self.data = self.ser.read(read)
if len(self.data)>0:
yield self.data.decode('utf-8')
sleep(.1)
self.ser.close()
应该以以下形式发送数据:
1,2
2,5
3,7
4(autoincrement),5(random number)
当我让它们在 CLI 上打印时工作正常。
但是我不能让它与 matplotlib 一起工作。
没有具体错误。
它只是显示
Opening Serial Port...
Setup Successful
然后...就是这样。然后什么也没有发生。
我的代码有什么问题?
我做了更多研究,发现我不应该使用 show()
所以我重写了我的代码如下:
import time
import numpy as np
import matplotlib.pyplot as plt
from serialdata import SerialData
plt.axis([0, 1000, 0, 1])
plt.ion()
plt.show()
for i in range(1000):
# y = np.random.random()
a = SerialData()
b = a.setSerial('COM3', 9600)
dataArray = a.getSerial(9999)
print("Data Gathering...")
for eachLine in dataArray:
if len(eachLine)>1:
y = eachLine.split(',')[1]
plt.scatter(i, y)
plt.draw()
time.sleep(0.05)
但是,结果是一样的
我没有串口,但这是我试图解决的问题:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import time
import random
from time import sleep
class MockData():
def __init__(self, n):
self.n = n #number of data points to return
def getData(self):
start = 0 #mock autoincrement
for i in range(self.n):
yield (start, random.randint(0, 100)) #yield a tuple, not a str (x(autoincrem),y(random))
start+=1
fig = plt.figure()
ax1 = fig.add_subplot(1,1,1)
def animate(i):
xar = []
yar = []
#Open Serial Port and Receive Continuous Data
#in format of number,number
a = MockData(10)
dataArray = a.getData() #this is a generator that will yield (x,y) tuples 10 times.
for eachLine in dataArray:
x,y = eachLine
xar.append(int(x))
yar.append(int(y))
ax1.clear()
ax1.plot(xar,yar)
ani = animation.FuncAnimation(fig, animate, interval=1000)
plt.show()
此代码 fail\be 非常慢:
- 如果我一次请求大量数据:
a=MockData(1000)
每帧执行约 2-3 秒
如果读出时间很长,即
def getData(self):
start = 0
for i in range(self.n):
yield (start, random.randint(0, 100))
start+=1
time.sleep(1)
这将每帧执行约 10 秒
- 两者都
据我所知,问题出在您的 SerialData
class 中,特别是 getSerial
方法中。我知道这一点,因为这是负责检索实际数据的方法。由于我没有串口,我无法准确地测试在哪里,但我可以打赌一些猜测。
def getSerial(self, read):
while True:
self.data = self.ser.read(read)
if len(self.data)>0:
yield self.data.decode('utf-8')
sleep(.1)
self.ser.close()
self.data = self.ser.read(read)
这里的问题是你request 9999 bytes to be read。 9600 波特约为 1200 bytes/s。要读取 9 999 个字节,大约需要 10 秒。那就是 if 实际上有 9999 个新字节要读取。如果没有 read
函数将继续等待,直到它读入该数量。这等于我的 1) 测试用例,只是等待时间为 sleep(10)
。因此 if
检查已经在 read
函数中,因此您不必再次检查。
self.data.decode('utf-8')
看看解码9999字节需要多长时间:
>>> from timeit import Timer
>>> ts = Timer("s.decode('utf-8')", "s = b'1'*9999")
>>> ts.timeit()
2.524627058740407
现在,当然,这与您的转换不同,但由于我的膝上型电脑上没有串行端口,因此无法对其进行测试。总之,好像很慢。现在,你有我的情况 2) sleep(12)
sleep(.1)
现在看来简直是雪上加霜
您的代码没有报告错误,因为它有效,只需要 3 分钟多的时间来读取第一组数据并绘制它。
我的建议是你忽略这种方法,读入数据,几乎是字面意思,一个字节一个字节,在它们出现时绘制它们。有一个数组,您只需在新字节到来时附加新字节并绘制它。你可以很容易地拉出类似的东西:
serialPort = serial.Serial(self.port, self.baudrate, timeout=1)
data = list() #some global list to append to
def animate():
d = serialPort.read(2) #this will hang until it completes
data.append(d) #does the data really have to be in string?
ax.plot(data)
ani = animation.FuncAnimation(fig, animate, interval=1000) #optionally send in repeat_delay?
plt.show()
尽管部分原因是我不确定如果数据数组开始变得非常大,动画会如何表现,因此您可以考虑将 x 轴向右移动,删除旧数据并定期创建一个新数组.
希望我能帮助您指出需要测试的内容。开始从串行端口对所有读入进行计时,以了解需要多长时间。不要忘记确保您实际上正在从串行端口读取 任何内容,如果您没有读取,serial.read 将在那里等待。
我也从来没有做过这样的事情,所以我可能偏离了正轨。上次和串口打交道是elementary(rofl)的robotics championship
我正在尝试根据从连续实时数据源收集的数据制作一个简单的图表。
我使用matplotlib的代码如下:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import time
from serialdata import SerialData
fig = plt.figure()
ax1 = fig.add_subplot(1,1,1)
def animate(i):
xar = []
yar = []
#Open Serial Port and Receive Continuous Data
#in format of number,number
a = SerialData()
b = a.setSerial('COM3', 9600)
dataArray = a.getSerial(9999)
for eachLine in dataArray:
if len(eachLine)>1:
x,y = eachLine.split(',')
xar.append(int(x))
yar.append(int(y))
ax1.clear()
ax1.plot(xar,yar)
ani = animation.FuncAnimation(fig, animate, interval=1000)
plt.show()
serialdata.py每次从数据源获取数据时只生成数据:
import serial
from time import sleep
class SerialData:
def __init__(self):
pass
def setSerial(self, port, baudrate):
self.port = port
self.baudrate = baudrate
print("Opening Serial Port...")
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
sleep(2)
print("Setup Successful")
def getSerial(self, read):
while True:
self.data = self.ser.read(read)
if len(self.data)>0:
yield self.data.decode('utf-8')
sleep(.1)
self.ser.close()
应该以以下形式发送数据:
1,2
2,5
3,7
4(autoincrement),5(random number)
当我让它们在 CLI 上打印时工作正常。 但是我不能让它与 matplotlib 一起工作。
没有具体错误。
它只是显示
Opening Serial Port...
Setup Successful
然后...就是这样。然后什么也没有发生。 我的代码有什么问题?
我做了更多研究,发现我不应该使用 show() 所以我重写了我的代码如下:
import time
import numpy as np
import matplotlib.pyplot as plt
from serialdata import SerialData
plt.axis([0, 1000, 0, 1])
plt.ion()
plt.show()
for i in range(1000):
# y = np.random.random()
a = SerialData()
b = a.setSerial('COM3', 9600)
dataArray = a.getSerial(9999)
print("Data Gathering...")
for eachLine in dataArray:
if len(eachLine)>1:
y = eachLine.split(',')[1]
plt.scatter(i, y)
plt.draw()
time.sleep(0.05)
但是,结果是一样的
我没有串口,但这是我试图解决的问题:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import time
import random
from time import sleep
class MockData():
def __init__(self, n):
self.n = n #number of data points to return
def getData(self):
start = 0 #mock autoincrement
for i in range(self.n):
yield (start, random.randint(0, 100)) #yield a tuple, not a str (x(autoincrem),y(random))
start+=1
fig = plt.figure()
ax1 = fig.add_subplot(1,1,1)
def animate(i):
xar = []
yar = []
#Open Serial Port and Receive Continuous Data
#in format of number,number
a = MockData(10)
dataArray = a.getData() #this is a generator that will yield (x,y) tuples 10 times.
for eachLine in dataArray:
x,y = eachLine
xar.append(int(x))
yar.append(int(y))
ax1.clear()
ax1.plot(xar,yar)
ani = animation.FuncAnimation(fig, animate, interval=1000)
plt.show()
此代码 fail\be 非常慢:
- 如果我一次请求大量数据:
a=MockData(1000)
每帧执行约 2-3 秒 如果读出时间很长,即
def getData(self): start = 0 for i in range(self.n): yield (start, random.randint(0, 100)) start+=1 time.sleep(1)
这将每帧执行约 10 秒
- 两者都
据我所知,问题出在您的 SerialData
class 中,特别是 getSerial
方法中。我知道这一点,因为这是负责检索实际数据的方法。由于我没有串口,我无法准确地测试在哪里,但我可以打赌一些猜测。
def getSerial(self, read):
while True:
self.data = self.ser.read(read)
if len(self.data)>0:
yield self.data.decode('utf-8')
sleep(.1)
self.ser.close()
self.data = self.ser.read(read)
这里的问题是你request 9999 bytes to be read。 9600 波特约为 1200 bytes/s。要读取 9 999 个字节,大约需要 10 秒。那就是 if 实际上有 9999 个新字节要读取。如果没有read
函数将继续等待,直到它读入该数量。这等于我的 1) 测试用例,只是等待时间为sleep(10)
。因此if
检查已经在read
函数中,因此您不必再次检查。self.data.decode('utf-8')
看看解码9999字节需要多长时间:>>> from timeit import Timer >>> ts = Timer("s.decode('utf-8')", "s = b'1'*9999") >>> ts.timeit() 2.524627058740407
现在,当然,这与您的转换不同,但由于我的膝上型电脑上没有串行端口,因此无法对其进行测试。总之,好像很慢。现在,你有我的情况 2)
sleep(12)
sleep(.1)
现在看来简直是雪上加霜
您的代码没有报告错误,因为它有效,只需要 3 分钟多的时间来读取第一组数据并绘制它。
我的建议是你忽略这种方法,读入数据,几乎是字面意思,一个字节一个字节,在它们出现时绘制它们。有一个数组,您只需在新字节到来时附加新字节并绘制它。你可以很容易地拉出类似的东西:
serialPort = serial.Serial(self.port, self.baudrate, timeout=1)
data = list() #some global list to append to
def animate():
d = serialPort.read(2) #this will hang until it completes
data.append(d) #does the data really have to be in string?
ax.plot(data)
ani = animation.FuncAnimation(fig, animate, interval=1000) #optionally send in repeat_delay?
plt.show()
尽管部分原因是我不确定如果数据数组开始变得非常大,动画会如何表现,因此您可以考虑将 x 轴向右移动,删除旧数据并定期创建一个新数组.
希望我能帮助您指出需要测试的内容。开始从串行端口对所有读入进行计时,以了解需要多长时间。不要忘记确保您实际上正在从串行端口读取 任何内容,如果您没有读取,serial.read 将在那里等待。
我也从来没有做过这样的事情,所以我可能偏离了正轨。上次和串口打交道是elementary(rofl)的robotics championship