使用 Python 查找与一维局部最大值相邻的前导和尾随谷

Find leading and trailing valleys adjacent to 1-D local maxima with Python

objective是从一维信号的局部最大值列表中找到前导谷和尾随谷,如下图所示

为此,

我建议先通过scipy.signalfind_peaks找到peaks。 return 出现局部最大值的索引列表。

其次,signal.argrelextrema被用来提取所有valleys。最后,第三步是 select 来自 find_peaks 的每个索引可以驻留的最接近的边界值对(来自山谷索引的对值)。

虽然下面的代码有效,但我想知道是否有更有效的方法(可以是 scipy 的 Numpy)来执行此操作。我注意到,argrelextremasearchsorted 需要一些时间来处理很长的一维信号

重现上图的完整代码

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import signal
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks

x = electrocardiogram()[2000:2500]

peaks, _ = find_peaks(x, height=0.7)

valley_indexes = signal.argrelextrema(x, np.less)[0]
i = np.searchsorted(valley_indexes, peaks)
m = ~np.isin(i, [0, len(valley_indexes)])

df = pd.DataFrame({'peaks_point': peaks})
df.loc[m, 'leftp'], df.loc[m, 'rightp'] = valley_indexes[i[m] - 1], valley_indexes[i[m]]



leftp=df['leftp'].to_numpy().astype(int)
rightp=df['rightp'].to_numpy().astype(int)


plt.plot(x)
plt.plot(peaks, x[peaks], "x",label='Local Maxima')

plt.scatter(leftp, x[leftp],marker='x',s=100,alpha=0.7,label='leading_valleys')
plt.scatter(rightp,x[rightp],marker='x',s=100,alpha=0.7,label='Trailing_valleys')

plt.plot(np.zeros_like(x), "--", color="gray")
plt.legend()
plt.show()

我认为没有比 numpy/scipy 更快的方法了。明显的优化是在峰值周围的第一个局部最小值处停止搜索。不幸的是,当用纯 python 编写时,示例输入的速度较慢(至少在 argrelextrema 方法中的中间数组填满我的 RAM 之前)。

不过,如果你可以使用 numba,这种优化会快得多:

10800000 points, 98900 peaks (54.29 ms for peaks):
argrelextrema:    143.04 ms
optimized python: 572.55 ms
optimized numba:    8.04 ms
import time
import numpy as np
from numba import jit
from scipy import signal
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks

def argrelextrema(arr, peaks):
    valley_indexes = signal.argrelextrema(arr, np.less)[0]
    i = np.searchsorted(valley_indexes, peaks)
    m = ~np.isin(i, [0, len(valley_indexes)])
    return valley_indexes[i[m] - 1], valley_indexes[i[m]]

def find_neighboring_valleys(arr, peak_idx):
    right = -1
    for i in range(peak_idx + 1, len(arr) - 1):
        if arr[i - 1] > arr[i] < arr[i + 1]:
            right = i
            break
    left = -1
    for i in range(peak_idx - 1, 0, -1):
        if arr[i - 1] > arr[i] < arr[i + 1]:
            left = i
            break
    return left, right

def optimized_python(arr, peaks):
    lefts = np.empty_like(peaks)
    rights = np.empty_like(peaks)
    for i, peak in enumerate(peaks):
        lefts[i], rights[i] = find_neighboring_valleys(arr, peak)
    return lefts, rights

find_neighboring_valleys_numba = jit(find_neighboring_valleys)

@jit
def optimized_numba(arr, peaks):
    lefts = np.empty_like(peaks)
    rights = np.empty_like(peaks)
    for i, peak in enumerate(peaks):
        lefts[i], rights[i] = find_neighboring_valleys_numba(arr, peak)
    return lefts, rights


x = np.tile(electrocardiogram(), 100)

start = time.perf_counter()
peaks, _ = find_peaks(x, height=0.7)
elapsed = time.perf_counter() - start
print(f"{len(x)} points, {len(peaks)} peaks ({elapsed*1000:.2f} ms for peaks):")

start = time.perf_counter()
leftp, rightp = argrelextrema(x, peaks)
elapsed = time.perf_counter() - start
print(f"argrelextrema:    {elapsed*1000:6.2f} ms")

start = time.perf_counter()
leftp, rightp = optimized_python(x, peaks)
elapsed = time.perf_counter() - start
print(f"optimized python: {elapsed*1000:6.2f} ms")

# warmup the jit
optimized_numba(x, peaks)
start = time.perf_counter()
leftp, rightp = optimized_numba(x, peaks)
elapsed = time.perf_counter() - start
print(f"optimized numba:  {elapsed*1000:6.2f} ms")

另一种方法是先反转x,将谷反转为峰。

peaks_valley, _ = find_peaks(x*-1, height=0)

和select最接近的边界值对(来自山谷索引的对值)来自find_peaks的每个索引可以驻留。

完整代码以及与argrelextrema的对比如下:

import time

import numpy as np
import pandas as pd
from scipy import signal
from scipy.misc import electrocardiogram
from scipy.signal import find_peaks


def _get_vally_idx_pd(valley_indexes, peaks):
    i = np.searchsorted(valley_indexes, peaks)
    m = ~np.isin(i, [0, len(valley_indexes)])

    df = pd.DataFrame({'peaks_point': peaks})
    df.loc[m, 'leftp'], df.loc[m, 'rightp'] = valley_indexes[i[m] - 1], valley_indexes[i[m]]
    return df

def argrelEx_approach(peaks,x):
    valley_indexes = signal.argrelextrema(x, np.less)[0]
    df=_get_vally_idx_pd(valley_indexes, peaks)
    print(df)

def _invert_approach(peaks,x):
    peaks_valley, _ = find_peaks(x*-1, height=0)
    df=_get_vally_idx_pd(peaks_valley, peaks)
    print(df)

x = electrocardiogram()[2000:2500]

peaks, _ = find_peaks(x, height=0.7)


start1 = time.perf_counter()
argrelEx_approach(peaks,x)
end1 = time.perf_counter()
time_taken = end1 - start1
start2 = time.perf_counter()
_invert_approach(peaks,x)
end2 = time.perf_counter()
    
print('Time to complete argrelextrema : ',end1 - start1)
print('Time to complete inverting the signal: ',end2 - start2)

输出

Time to complete argrelextrema :  0.004720643861219287
Time to complete inverting the signal:  0.0034988040570169687

这也比 的建议更快。