减少 python 程序计算的算法时间
decrease algorithm time on python program calculation
我在 python 上有 算法计算 过滤器 (low_pass,high_pass ...) ,在我的程序中,我正在从实时设备读取数据,我需要对其进行处理,然后将其传输回设备。
每个数据块每 10 毫秒出现一次,因此此时我必须计算尽可能多的过滤器。
我的算法 :
def do_calculation(self,indata,outdata):
for i in range(0,len(indata)):
val=self.a0*indata[i]
val+=self.a1*self.xn1
val+=self.a2*self.xn2
val-=self.b1*self.yn1
val-=self.b2*self.yn2
outdata[i]= val
self.xn2 = self.xn1
self.xn1 = indata[i]
self.yn2 = self.yn1
self.yn1 = outdata[i]
我在开始 read/write 之前计算的系数 (a0,a1,a2,b1,b2)。
我使用从设备获得的每个输入从 Main 函数调用此函数,对其进行处理,然后使用 outdata 将其写回设备。
indata 是这样大小为 512 的列表
列表[[x0,x1][x1,x2].......[x510,x511]]
有什么方法可以提高此功能的性能?或者 python 可能有很大的局限性。
现在,每个过滤器需要 2 或 3(有些过滤器甚至需要 5)毫秒,我想减少它以便我可以在 10 毫秒的范围内创建更多过滤器.
感谢帮助!
下面的第三个版本do_calculation3
比你的版本快一倍
class C():
def __init__(self):
self.a0 = .1
self.a1 = .02
self.a2 = -.3
self.b1 = .2
self.b2 = -.25
self.xn1 = -.1
self.xn2 = .11
self.yn1 = .12
self.yn2 = -.001
self.ab = [self.a0, self.a1, self.a2, -self.b1, -self.b2]
self.xy = [self.xn1, self.xn2, self.yn1, self.yn2]
def do_calculation(self,indata,outdata):
for i in range(0,len(indata)):
val=self.a0*indata[i]
val+=self.a1*self.xn1
val+=self.a2*self.xn2
val-=self.b1*self.yn1
val-=self.b2*self.yn2
outdata[i]= val
self.xn2 = self.xn1
self.xn1 = indata[i]
self.yn2 = self.yn1
self.yn1 = outdata[i]
def do_calculation2(self,indata,outdata):
xy = self.xy
ab = self.ab
for i, ini in enumerate(indata):
val = sum((n * m for n, m in zip(ab, [ini] + xy)))
outdata[i] = val
xy = [ini, xy[0], val, xy[2]]
self.xy = xy
def do_calculation3(self,indata,outdata):
j,k,l,m,n = self.ab
xn1, xn2, yn1, yn2 = self.xn1, self.xn2, self.yn1, self.yn2
for i, ini in enumerate(indata):
outdata[i] = val = j*ini + k*xn1 + l*xn2 + m*yn1 + n*yn2
xn1, xn2, yn1, yn2 = ini, xn1, val, yn1
self.xn1, self.xn2, self.yn1, self.yn2 = xn1, xn2, yn1, yn2
c = C()
dati = list((1/x if x else 0) for x in range(512))
dato = [0 for _ in dati]
c.do_calculation(dati, dato)
c2 = C()
dati2 = list((1/x if x else 0) for x in range(512))
dato2 = [0 for _ in dati2]
c2.do_calculation2(dati2, dato2)
c3 = C()
dati3 = list((1/x if x else 0) for x in range(512))
dato3 = [0 for _ in dati3]
c3.do_calculation3(dati3, dato3)
assert dato == dato2
assert dato == dato3
#%%
print('\n## do_calculation\n')
c = C()
dati = list((1/x if x else 0) for x in range(512))
dato = [0 for _ in dati]
%timeit c.do_calculation(dati, dato)
print('\n## do_calculation2\n')
c2 = C()
dati2 = list((1/x if x else 0) for x in range(512))
dato2 = [0 for _ in dati2]
%timeit c2.do_calculation2(dati2, dato2)
print('\n## do_calculation3\n')
c3 = C()
dati3 = list((1/x if x else 0) for x in range(512))
dato3 = [0 for _ in dati3]
%timeit c3.do_calculation3(dati3, dato3)
时间
## do_calculation
887 µs ± 35.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
## do_calculation2
1.49 ms ± 417 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
## do_calculation3
332 µs ± 83.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
我在 python 上有 算法计算 过滤器 (low_pass,high_pass ...) ,在我的程序中,我正在从实时设备读取数据,我需要对其进行处理,然后将其传输回设备。 每个数据块每 10 毫秒出现一次,因此此时我必须计算尽可能多的过滤器。 我的算法 :
def do_calculation(self,indata,outdata):
for i in range(0,len(indata)):
val=self.a0*indata[i]
val+=self.a1*self.xn1
val+=self.a2*self.xn2
val-=self.b1*self.yn1
val-=self.b2*self.yn2
outdata[i]= val
self.xn2 = self.xn1
self.xn1 = indata[i]
self.yn2 = self.yn1
self.yn1 = outdata[i]
我在开始 read/write 之前计算的系数 (a0,a1,a2,b1,b2)。
我使用从设备获得的每个输入从 Main 函数调用此函数,对其进行处理,然后使用 outdata 将其写回设备。 indata 是这样大小为 512 的列表 列表[[x0,x1][x1,x2].......[x510,x511]]
有什么方法可以提高此功能的性能?或者 python 可能有很大的局限性。 现在,每个过滤器需要 2 或 3(有些过滤器甚至需要 5)毫秒,我想减少它以便我可以在 10 毫秒的范围内创建更多过滤器.
感谢帮助!
下面的第三个版本do_calculation3
比你的版本快一倍
class C():
def __init__(self):
self.a0 = .1
self.a1 = .02
self.a2 = -.3
self.b1 = .2
self.b2 = -.25
self.xn1 = -.1
self.xn2 = .11
self.yn1 = .12
self.yn2 = -.001
self.ab = [self.a0, self.a1, self.a2, -self.b1, -self.b2]
self.xy = [self.xn1, self.xn2, self.yn1, self.yn2]
def do_calculation(self,indata,outdata):
for i in range(0,len(indata)):
val=self.a0*indata[i]
val+=self.a1*self.xn1
val+=self.a2*self.xn2
val-=self.b1*self.yn1
val-=self.b2*self.yn2
outdata[i]= val
self.xn2 = self.xn1
self.xn1 = indata[i]
self.yn2 = self.yn1
self.yn1 = outdata[i]
def do_calculation2(self,indata,outdata):
xy = self.xy
ab = self.ab
for i, ini in enumerate(indata):
val = sum((n * m for n, m in zip(ab, [ini] + xy)))
outdata[i] = val
xy = [ini, xy[0], val, xy[2]]
self.xy = xy
def do_calculation3(self,indata,outdata):
j,k,l,m,n = self.ab
xn1, xn2, yn1, yn2 = self.xn1, self.xn2, self.yn1, self.yn2
for i, ini in enumerate(indata):
outdata[i] = val = j*ini + k*xn1 + l*xn2 + m*yn1 + n*yn2
xn1, xn2, yn1, yn2 = ini, xn1, val, yn1
self.xn1, self.xn2, self.yn1, self.yn2 = xn1, xn2, yn1, yn2
c = C()
dati = list((1/x if x else 0) for x in range(512))
dato = [0 for _ in dati]
c.do_calculation(dati, dato)
c2 = C()
dati2 = list((1/x if x else 0) for x in range(512))
dato2 = [0 for _ in dati2]
c2.do_calculation2(dati2, dato2)
c3 = C()
dati3 = list((1/x if x else 0) for x in range(512))
dato3 = [0 for _ in dati3]
c3.do_calculation3(dati3, dato3)
assert dato == dato2
assert dato == dato3
#%%
print('\n## do_calculation\n')
c = C()
dati = list((1/x if x else 0) for x in range(512))
dato = [0 for _ in dati]
%timeit c.do_calculation(dati, dato)
print('\n## do_calculation2\n')
c2 = C()
dati2 = list((1/x if x else 0) for x in range(512))
dato2 = [0 for _ in dati2]
%timeit c2.do_calculation2(dati2, dato2)
print('\n## do_calculation3\n')
c3 = C()
dati3 = list((1/x if x else 0) for x in range(512))
dato3 = [0 for _ in dati3]
%timeit c3.do_calculation3(dati3, dato3)
时间
## do_calculation
887 µs ± 35.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
## do_calculation2
1.49 ms ± 417 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
## do_calculation3
332 µs ± 83.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)