使用 scipy.signal 库中的 savgol_filter 平滑 Python 上的在线数据
Smoothing online data on Python with savgol_filter from scipy.signal library
我想使用 scipy.signal 库中的 savgol_filter 过滤在线数据。但是当我尝试将它用于在线数据时(当新元素一个接一个地出现时)我意识到 savgol_filter 与在线数据相比有一些延迟 (window_length//2)使用离线数据(它们的元素可同时用于计算)。
我使用类似的代码(请见下文)
from queue import Queue, Empty
import numpy as np
from scipy.signal import savgol_filter
window_size = 5
data = list()
q = Queue()
d = [2.22, 2.22, 5.55, 2.22, 1.11, 0.01, 1.11, 4.44, 9.99, 1.11, 3.33]
for i in d:
q.put(i)
res = list()
while not q.empty():
element = q.get()
data.append(element)
length = len(data)
npd = np.array(data[length - window_size:])
if length >= window_size:
res.append(savgol_filter(npd , window_size, 2)[window_size // 2])
npd = np.array(data)
res2 = savgol_filter(npd , window_size, 2)
np.set_printoptions(precision=2)
print('source data ', npd)
print('online res ', np.array(res))
print('offline res ', res2)
- 我的假设对吗?能以某种方式纠正吗?
- 如果我是对的,能否请您建议类似的过滤器,在计算中没有这样的问题?
感谢更新您的问题!
问题是您的 online_res
方法丢失了部分数据。边缘值由 scipy 的 savgol_filter
处理,但不用于您的手动编码版本。
对于您的示例,请查看两个结果:
'online res': array([ 3.93, 3.17, 0.73, 0.2 , 1.11, 5.87, 6.37]))
'offline res': array([ 1.84, 3.52, 3.93, 3.17, 0.73, 0.2 , 1.11, 5.87, 6.37, 5.3, 1.84]))
它们是相同的,但是 offline res
处理了值 data[0:2]
和 data[-2:]
。在您的情况下,如果未指定特定 mode
,则将其设置为默认值 interpolate
:
When the ‘interp’ mode is selected (the default), no extension is
used. Instead, a degree polyorder polynomial is fit to the last
window_length values of the edges, and this polynomial is used to
evaluate the last window_length // 2 output values.
你没有为你的 online res
做这件事。
我为双方实施了一个简单的 polynomial fit
,然后得到了完全相同的结果:
from queue import Queue, Empty
import numpy as np
from scipy.signal import savgol_filter
window_size = 5
data = list()
q = Queue()
d = [2.22, 2.22, 5.55, 2.22, 1.11, 0.01, 1.11, 4.44, 9.99, 1.11, 3.33]
for i in d:
q.put(i)
res = list()
while not q.empty():
element = q.get()
data.append(element)
length = len(data)
npd = np.array(data[length - window_size:])
if length >= window_size:
res.append(savgol_filter(npd, window_size, 2)[window_size//2])
# calculate the polynomial fit for elements 0,1,2,3,4
poly = np.polyfit(range(window_size), d[0:window_size], deg=2)
p = np.poly1d(poly)
res.insert(0, p(0)) # insert the polynomial fits at index 0 and 1
res.insert(1, p(1))
# calculate the polynomial fit for the 5 last elements (range runs like [4,3,2,1,0])
poly = np.polyfit(range(window_size-1, -1, -1), d[-window_size:], deg=2)
p = np.poly1d(poly)
res.append(p(1))
res.append(p(0))
npd = np.array(data)
res2 = savgol_filter(npd, window_size, 2)
diff = res - res2 # in your example you were calculating the wrong diff btw
np.set_printoptions(precision=2)
print('source data ', npd)
print('online res ', np.array(res))
print('offline res ', res2)
print('error ', diff.sum())
结果:
>>> Out: ('erorr ', -7.9936057773011271e-15)
编辑:
此版本独立于 d
-list,这意味着它可以消化从您的来源获取的任何数据。
window_size = 5
half_window_size = window_size // 2 # this variable is used often
data = list()
q = Queue()
d = [2.22, 2.22, 5.55, 2.22, 1.11, 0.01, 1.11, 4.44, 9.99, 1.11, 3.33]
for i in d:
q.put(i)
res = [None]*window_size # create list of correct size instead of appending
while not q.empty():
element = q.get()
data.append(element)
length = len(data)
npd = np.array(data[length - window_size:])
if length == window_size: # this is called only once, when reaching the filter-center
# calculate the polynomial fit for elements 0,1,2,3,4
poly = np.polyfit(range(window_size), data, deg=2)
p = np.poly1d(poly)
for poly_i in range(half_window_size): # independent from window_size
res[poly_i] = p(poly_i)
# insert the sav_gol-value at index 2
res[(length-1)-half_window_size] = savgol_filter(npd, window_size, 2)[half_window_size]
poly = np.polyfit(range(window_size - 1, -1, -1), data[-window_size:], deg=2)
p = np.poly1d(poly)
for poly_i_end in range(half_window_size):
res[(window_size-1)-poly_i_end] = p(poly_i_end)
elif length > window_size:
res.append(None) # add another slot in the res-list
# overwrite poly-value with savgol
res[(length-1)-half_window_size] = savgol_filter(npd, window_size, 2)[half_window_size]
# extrapolate again into the future
poly = np.polyfit(range(window_size - 1, -1, -1), data[-window_size:], deg=2)
p = np.poly1d(poly)
for poly_i_end in range(half_window_size):
res[-poly_i_end-1] = p(poly_i_end)
看起来卡尔曼滤波器系列符合我的预期。这是因为它们在 "Mean square error" 方面是最优的。例如,可以找到实现 here。
我想使用 scipy.signal 库中的 savgol_filter 过滤在线数据。但是当我尝试将它用于在线数据时(当新元素一个接一个地出现时)我意识到 savgol_filter 与在线数据相比有一些延迟 (window_length//2)使用离线数据(它们的元素可同时用于计算)。 我使用类似的代码(请见下文)
from queue import Queue, Empty
import numpy as np
from scipy.signal import savgol_filter
window_size = 5
data = list()
q = Queue()
d = [2.22, 2.22, 5.55, 2.22, 1.11, 0.01, 1.11, 4.44, 9.99, 1.11, 3.33]
for i in d:
q.put(i)
res = list()
while not q.empty():
element = q.get()
data.append(element)
length = len(data)
npd = np.array(data[length - window_size:])
if length >= window_size:
res.append(savgol_filter(npd , window_size, 2)[window_size // 2])
npd = np.array(data)
res2 = savgol_filter(npd , window_size, 2)
np.set_printoptions(precision=2)
print('source data ', npd)
print('online res ', np.array(res))
print('offline res ', res2)
- 我的假设对吗?能以某种方式纠正吗?
- 如果我是对的,能否请您建议类似的过滤器,在计算中没有这样的问题?
感谢更新您的问题!
问题是您的 online_res
方法丢失了部分数据。边缘值由 scipy 的 savgol_filter
处理,但不用于您的手动编码版本。
对于您的示例,请查看两个结果:
'online res': array([ 3.93, 3.17, 0.73, 0.2 , 1.11, 5.87, 6.37]))
'offline res': array([ 1.84, 3.52, 3.93, 3.17, 0.73, 0.2 , 1.11, 5.87, 6.37, 5.3, 1.84]))
它们是相同的,但是 offline res
处理了值 data[0:2]
和 data[-2:]
。在您的情况下,如果未指定特定 mode
,则将其设置为默认值 interpolate
:
When the ‘interp’ mode is selected (the default), no extension is used. Instead, a degree polyorder polynomial is fit to the last window_length values of the edges, and this polynomial is used to evaluate the last window_length // 2 output values.
你没有为你的 online res
做这件事。
我为双方实施了一个简单的 polynomial fit
,然后得到了完全相同的结果:
from queue import Queue, Empty
import numpy as np
from scipy.signal import savgol_filter
window_size = 5
data = list()
q = Queue()
d = [2.22, 2.22, 5.55, 2.22, 1.11, 0.01, 1.11, 4.44, 9.99, 1.11, 3.33]
for i in d:
q.put(i)
res = list()
while not q.empty():
element = q.get()
data.append(element)
length = len(data)
npd = np.array(data[length - window_size:])
if length >= window_size:
res.append(savgol_filter(npd, window_size, 2)[window_size//2])
# calculate the polynomial fit for elements 0,1,2,3,4
poly = np.polyfit(range(window_size), d[0:window_size], deg=2)
p = np.poly1d(poly)
res.insert(0, p(0)) # insert the polynomial fits at index 0 and 1
res.insert(1, p(1))
# calculate the polynomial fit for the 5 last elements (range runs like [4,3,2,1,0])
poly = np.polyfit(range(window_size-1, -1, -1), d[-window_size:], deg=2)
p = np.poly1d(poly)
res.append(p(1))
res.append(p(0))
npd = np.array(data)
res2 = savgol_filter(npd, window_size, 2)
diff = res - res2 # in your example you were calculating the wrong diff btw
np.set_printoptions(precision=2)
print('source data ', npd)
print('online res ', np.array(res))
print('offline res ', res2)
print('error ', diff.sum())
结果:
>>> Out: ('erorr ', -7.9936057773011271e-15)
编辑:
此版本独立于 d
-list,这意味着它可以消化从您的来源获取的任何数据。
window_size = 5
half_window_size = window_size // 2 # this variable is used often
data = list()
q = Queue()
d = [2.22, 2.22, 5.55, 2.22, 1.11, 0.01, 1.11, 4.44, 9.99, 1.11, 3.33]
for i in d:
q.put(i)
res = [None]*window_size # create list of correct size instead of appending
while not q.empty():
element = q.get()
data.append(element)
length = len(data)
npd = np.array(data[length - window_size:])
if length == window_size: # this is called only once, when reaching the filter-center
# calculate the polynomial fit for elements 0,1,2,3,4
poly = np.polyfit(range(window_size), data, deg=2)
p = np.poly1d(poly)
for poly_i in range(half_window_size): # independent from window_size
res[poly_i] = p(poly_i)
# insert the sav_gol-value at index 2
res[(length-1)-half_window_size] = savgol_filter(npd, window_size, 2)[half_window_size]
poly = np.polyfit(range(window_size - 1, -1, -1), data[-window_size:], deg=2)
p = np.poly1d(poly)
for poly_i_end in range(half_window_size):
res[(window_size-1)-poly_i_end] = p(poly_i_end)
elif length > window_size:
res.append(None) # add another slot in the res-list
# overwrite poly-value with savgol
res[(length-1)-half_window_size] = savgol_filter(npd, window_size, 2)[half_window_size]
# extrapolate again into the future
poly = np.polyfit(range(window_size - 1, -1, -1), data[-window_size:], deg=2)
p = np.poly1d(poly)
for poly_i_end in range(half_window_size):
res[-poly_i_end-1] = p(poly_i_end)
看起来卡尔曼滤波器系列符合我的预期。这是因为它们在 "Mean square error" 方面是最优的。例如,可以找到实现 here。