使用 Python 查找与一维局部最大值相邻的前导和尾随谷
Find leading and trailing valleys adjacent to 1-D local maxima with Python
objective是从一维信号的局部最大值列表中找到前导谷和尾随谷,如下图所示
为此,
我建议先通过scipy.signal
的find_peaks
找到peaks
。 return 出现局部最大值的索引列表。
其次,signal.argrelextrema
被用来提取所有valleys
。最后,第三步是 select 来自 find_peaks 的每个索引可以驻留的最接近的边界值对(来自山谷索引的对值)。
虽然下面的代码有效,但我想知道是否有更有效的方法(可以是 scipy 的 Numpy)来执行此操作。我注意到,argrelextrema
和 searchsorted
需要一些时间来处理很长的一维信号
重现上图的完整代码
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import signal
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks
x = electrocardiogram()[2000:2500]
peaks, _ = find_peaks(x, height=0.7)
valley_indexes = signal.argrelextrema(x, np.less)[0]
i = np.searchsorted(valley_indexes, peaks)
m = ~np.isin(i, [0, len(valley_indexes)])
df = pd.DataFrame({'peaks_point': peaks})
df.loc[m, 'leftp'], df.loc[m, 'rightp'] = valley_indexes[i[m] - 1], valley_indexes[i[m]]
leftp=df['leftp'].to_numpy().astype(int)
rightp=df['rightp'].to_numpy().astype(int)
plt.plot(x)
plt.plot(peaks, x[peaks], "x",label='Local Maxima')
plt.scatter(leftp, x[leftp],marker='x',s=100,alpha=0.7,label='leading_valleys')
plt.scatter(rightp,x[rightp],marker='x',s=100,alpha=0.7,label='Trailing_valleys')
plt.plot(np.zeros_like(x), "--", color="gray")
plt.legend()
plt.show()
我认为没有比 numpy/scipy 更快的方法了。明显的优化是在峰值周围的第一个局部最小值处停止搜索。不幸的是,当用纯 python 编写时,示例输入的速度较慢(至少在 argrelextrema
方法中的中间数组填满我的 RAM 之前)。
不过,如果你可以使用 numba,这种优化会快得多:
10800000 points, 98900 peaks (54.29 ms for peaks):
argrelextrema: 143.04 ms
optimized python: 572.55 ms
optimized numba: 8.04 ms
import time
import numpy as np
from numba import jit
from scipy import signal
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks
def argrelextrema(arr, peaks):
valley_indexes = signal.argrelextrema(arr, np.less)[0]
i = np.searchsorted(valley_indexes, peaks)
m = ~np.isin(i, [0, len(valley_indexes)])
return valley_indexes[i[m] - 1], valley_indexes[i[m]]
def find_neighboring_valleys(arr, peak_idx):
right = -1
for i in range(peak_idx + 1, len(arr) - 1):
if arr[i - 1] > arr[i] < arr[i + 1]:
right = i
break
left = -1
for i in range(peak_idx - 1, 0, -1):
if arr[i - 1] > arr[i] < arr[i + 1]:
left = i
break
return left, right
def optimized_python(arr, peaks):
lefts = np.empty_like(peaks)
rights = np.empty_like(peaks)
for i, peak in enumerate(peaks):
lefts[i], rights[i] = find_neighboring_valleys(arr, peak)
return lefts, rights
find_neighboring_valleys_numba = jit(find_neighboring_valleys)
@jit
def optimized_numba(arr, peaks):
lefts = np.empty_like(peaks)
rights = np.empty_like(peaks)
for i, peak in enumerate(peaks):
lefts[i], rights[i] = find_neighboring_valleys_numba(arr, peak)
return lefts, rights
x = np.tile(electrocardiogram(), 100)
start = time.perf_counter()
peaks, _ = find_peaks(x, height=0.7)
elapsed = time.perf_counter() - start
print(f"{len(x)} points, {len(peaks)} peaks ({elapsed*1000:.2f} ms for peaks):")
start = time.perf_counter()
leftp, rightp = argrelextrema(x, peaks)
elapsed = time.perf_counter() - start
print(f"argrelextrema: {elapsed*1000:6.2f} ms")
start = time.perf_counter()
leftp, rightp = optimized_python(x, peaks)
elapsed = time.perf_counter() - start
print(f"optimized python: {elapsed*1000:6.2f} ms")
# warmup the jit
optimized_numba(x, peaks)
start = time.perf_counter()
leftp, rightp = optimized_numba(x, peaks)
elapsed = time.perf_counter() - start
print(f"optimized numba: {elapsed*1000:6.2f} ms")
另一种方法是先反转x
,将谷反转为峰。
peaks_valley, _ = find_peaks(x*-1, height=0)
和select最接近的边界值对(来自山谷索引的对值)来自find_peaks的每个索引可以驻留。
完整代码以及与argrelextrema
的对比如下:
import time
import numpy as np
import pandas as pd
from scipy import signal
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks
def _get_vally_idx_pd(valley_indexes, peaks):
i = np.searchsorted(valley_indexes, peaks)
m = ~np.isin(i, [0, len(valley_indexes)])
df = pd.DataFrame({'peaks_point': peaks})
df.loc[m, 'leftp'], df.loc[m, 'rightp'] = valley_indexes[i[m] - 1], valley_indexes[i[m]]
return df
def argrelEx_approach(peaks,x):
valley_indexes = signal.argrelextrema(x, np.less)[0]
df=_get_vally_idx_pd(valley_indexes, peaks)
print(df)
def _invert_approach(peaks,x):
peaks_valley, _ = find_peaks(x*-1, height=0)
df=_get_vally_idx_pd(peaks_valley, peaks)
print(df)
x = electrocardiogram()[2000:2500]
peaks, _ = find_peaks(x, height=0.7)
start1 = time.perf_counter()
argrelEx_approach(peaks,x)
end1 = time.perf_counter()
time_taken = end1 - start1
start2 = time.perf_counter()
_invert_approach(peaks,x)
end2 = time.perf_counter()
print('Time to complete argrelextrema : ',end1 - start1)
print('Time to complete inverting the signal: ',end2 - start2)
输出
Time to complete argrelextrema : 0.004720643861219287
Time to complete inverting the signal: 0.0034988040570169687
这也比 的建议更快。
objective是从一维信号的局部最大值列表中找到前导谷和尾随谷,如下图所示
为此,
我建议先通过scipy.signal
的find_peaks
找到peaks
。 return 出现局部最大值的索引列表。
其次,signal.argrelextrema
被用来提取所有valleys
。最后,第三步是 select 来自 find_peaks 的每个索引可以驻留的最接近的边界值对(来自山谷索引的对值)。
虽然下面的代码有效,但我想知道是否有更有效的方法(可以是 scipy 的 Numpy)来执行此操作。我注意到,argrelextrema
和 searchsorted
需要一些时间来处理很长的一维信号
重现上图的完整代码
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import signal
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks
x = electrocardiogram()[2000:2500]
peaks, _ = find_peaks(x, height=0.7)
valley_indexes = signal.argrelextrema(x, np.less)[0]
i = np.searchsorted(valley_indexes, peaks)
m = ~np.isin(i, [0, len(valley_indexes)])
df = pd.DataFrame({'peaks_point': peaks})
df.loc[m, 'leftp'], df.loc[m, 'rightp'] = valley_indexes[i[m] - 1], valley_indexes[i[m]]
leftp=df['leftp'].to_numpy().astype(int)
rightp=df['rightp'].to_numpy().astype(int)
plt.plot(x)
plt.plot(peaks, x[peaks], "x",label='Local Maxima')
plt.scatter(leftp, x[leftp],marker='x',s=100,alpha=0.7,label='leading_valleys')
plt.scatter(rightp,x[rightp],marker='x',s=100,alpha=0.7,label='Trailing_valleys')
plt.plot(np.zeros_like(x), "--", color="gray")
plt.legend()
plt.show()
我认为没有比 numpy/scipy 更快的方法了。明显的优化是在峰值周围的第一个局部最小值处停止搜索。不幸的是,当用纯 python 编写时,示例输入的速度较慢(至少在 argrelextrema
方法中的中间数组填满我的 RAM 之前)。
不过,如果你可以使用 numba,这种优化会快得多:
10800000 points, 98900 peaks (54.29 ms for peaks):
argrelextrema: 143.04 ms
optimized python: 572.55 ms
optimized numba: 8.04 ms
import time
import numpy as np
from numba import jit
from scipy import signal
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks
def argrelextrema(arr, peaks):
valley_indexes = signal.argrelextrema(arr, np.less)[0]
i = np.searchsorted(valley_indexes, peaks)
m = ~np.isin(i, [0, len(valley_indexes)])
return valley_indexes[i[m] - 1], valley_indexes[i[m]]
def find_neighboring_valleys(arr, peak_idx):
right = -1
for i in range(peak_idx + 1, len(arr) - 1):
if arr[i - 1] > arr[i] < arr[i + 1]:
right = i
break
left = -1
for i in range(peak_idx - 1, 0, -1):
if arr[i - 1] > arr[i] < arr[i + 1]:
left = i
break
return left, right
def optimized_python(arr, peaks):
lefts = np.empty_like(peaks)
rights = np.empty_like(peaks)
for i, peak in enumerate(peaks):
lefts[i], rights[i] = find_neighboring_valleys(arr, peak)
return lefts, rights
find_neighboring_valleys_numba = jit(find_neighboring_valleys)
@jit
def optimized_numba(arr, peaks):
lefts = np.empty_like(peaks)
rights = np.empty_like(peaks)
for i, peak in enumerate(peaks):
lefts[i], rights[i] = find_neighboring_valleys_numba(arr, peak)
return lefts, rights
x = np.tile(electrocardiogram(), 100)
start = time.perf_counter()
peaks, _ = find_peaks(x, height=0.7)
elapsed = time.perf_counter() - start
print(f"{len(x)} points, {len(peaks)} peaks ({elapsed*1000:.2f} ms for peaks):")
start = time.perf_counter()
leftp, rightp = argrelextrema(x, peaks)
elapsed = time.perf_counter() - start
print(f"argrelextrema: {elapsed*1000:6.2f} ms")
start = time.perf_counter()
leftp, rightp = optimized_python(x, peaks)
elapsed = time.perf_counter() - start
print(f"optimized python: {elapsed*1000:6.2f} ms")
# warmup the jit
optimized_numba(x, peaks)
start = time.perf_counter()
leftp, rightp = optimized_numba(x, peaks)
elapsed = time.perf_counter() - start
print(f"optimized numba: {elapsed*1000:6.2f} ms")
另一种方法是先反转x
,将谷反转为峰。
peaks_valley, _ = find_peaks(x*-1, height=0)
和select最接近的边界值对(来自山谷索引的对值)来自find_peaks的每个索引可以驻留。
完整代码以及与argrelextrema
的对比如下:
import time
import numpy as np
import pandas as pd
from scipy import signal
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks
def _get_vally_idx_pd(valley_indexes, peaks):
i = np.searchsorted(valley_indexes, peaks)
m = ~np.isin(i, [0, len(valley_indexes)])
df = pd.DataFrame({'peaks_point': peaks})
df.loc[m, 'leftp'], df.loc[m, 'rightp'] = valley_indexes[i[m] - 1], valley_indexes[i[m]]
return df
def argrelEx_approach(peaks,x):
valley_indexes = signal.argrelextrema(x, np.less)[0]
df=_get_vally_idx_pd(valley_indexes, peaks)
print(df)
def _invert_approach(peaks,x):
peaks_valley, _ = find_peaks(x*-1, height=0)
df=_get_vally_idx_pd(peaks_valley, peaks)
print(df)
x = electrocardiogram()[2000:2500]
peaks, _ = find_peaks(x, height=0.7)
start1 = time.perf_counter()
argrelEx_approach(peaks,x)
end1 = time.perf_counter()
time_taken = end1 - start1
start2 = time.perf_counter()
_invert_approach(peaks,x)
end2 = time.perf_counter()
print('Time to complete argrelextrema : ',end1 - start1)
print('Time to complete inverting the signal: ',end2 - start2)
输出
Time to complete argrelextrema : 0.004720643861219287
Time to complete inverting the signal: 0.0034988040570169687
这也比