numpy:累积'greater'操作

numpy: accumulate 'greater' operation

我正在尝试编写一个函数来检测所有上升沿 - 向量中值超过特定阈值的索引。此处描述了类似的内容:,但我想添加滞后,以便触发器不会触发,除非该值低于另一个限制。

我想出了以下代码:

import numpy as np

arr = np.linspace(-10, 10, 60)
sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)

above_trigger = sample_values > 0.6
below_deadband = sample_values < 0.0
combined = 1 * above_trigger - 1 * below_deadband

现在在 combined 数组中有 1 原始值高于上限,-1 低于下限,0值介于:

>>> combined
array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  0,  1,  1,  1,  0,  0,
        1,  1,  1,  0, -1, -1, -1, -1, -1, -1, -1, -1, -1,  0,  1,  1,  1,
        0,  0,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
        1,  0,  0,  1,  1,  1,  0, -1, -1])

我的想法是使用一些聪明的函数来顺序处理这个向量,并将所有零序列替换为它们前面的任何非零值。那么问题将归结为简单地找到值从 -1 变为 1.

的位置

我认为如果正确使用 greater 操作将实现此目的:-1 编码为 True1 编码为 False:

但是结果不是我所期望的:

>>> 1 - 2 * np.greater.accumulate(combined)
array([-1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,
        1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,
        1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1,
        1,  1,  1,  1,  1,  1,  1,  1,  1])

在这种情况下,greater 函数似乎没有正确地将布尔值与数值进行比较,即使它在标量或成对上使用时工作正常:

>>> np.greater(False, -1)
True
>>> np.greater.outer(False, combined)
array([False, False,  True,  True,  True,  True,  True,  True,  True,
        True,  True, False, False, False, False, False, False, False,
       False, False, False,  True,  True,  True,  True,  True,  True,
        True,  True,  True, False, False, False, False, False, False,
       False, False, False,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True, False, False, False, False, False,
       False, False, False, False,  True,  True])

这是预期的行为吗?我是不是做错了什么,有什么办法解决这个问题吗?

或者,也许您可​​以建议另一种方法来解决这个问题?

谢谢。

我不确定 np.greater.accumulate 的问题是什么(它似乎确实不像宣传的那样表现),但以下应该有效:

import numpy as np
import numpy as np

arr = np.linspace(-10, 10, 60)
sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)

above_trigger = sample_values > 0.6
below_deadband = sample_values < 0.0
combined = 1 * above_trigger - 1 * below_deadband

mask = combined != 0
idx = np.where(mask,np.arange(len(mask)),0)
idx = np.maximum.accumulate(idx)
result = combined[idx]

print(f"combined:\n {combined}\n")
print(f"result:\n {result}")

它给出:

combined:
 [ 1  1 -1 -1 -1 -1 -1 -1 -1 -1 -1  0  1  1  1  0  0  1  1  1  0 -1 -1 -1
 -1 -1 -1 -1 -1 -1  0  1  1  1  0  0  1  1  1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1  1  1  1  0  0  1  1  1  0 -1 -1]

result:
 [ 1  1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1  1  1  1  1  1  1  1  1  1 -1 -1 -1
 -1 -1 -1 -1 -1 -1 -1  1  1  1  1  1  1  1  1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1  1  1  1  1  1  1  1  1  1 -1 -1]

那么可以得到从-1跳到1的索引如下:

np.nonzero(result[1:] > result[:-1])[0] + 1

它给出:

array([12, 31, 49])

我一直在开发一个名为 ufunclab that includes the function hysteresis_relay 的软件包,它可以满足您的需求。我没有把它放在 PyPI 上,所以你必须获取源代码并自己构建才能使用它。

In [122]: import numpy as np

In [123]: from ufunclab import hysteresis_relay

In [124]: arr = np.linspace(-10, 10, 60)

In [125]: sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)

In [126]: hysteresis_relay(sample_values, 0.0, 0.6, -1, 1, 1).astype(int)
Out[126]: 
array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,  1,  1,
        1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,
        1,  1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
        1,  1,  1,  1,  1,  1,  1, -1, -1])

另一种选择是使用Pandas(但我怀疑@bb1 的答案会比这更有效,并且@bb1 的答案避免依赖于另一个库)。

  1. combined 转换为 Pandas 系列。
  2. 用系列中的 pd.NA 替换 0。
  3. 使用方法 fillna()method='ffill' 来“向前填充”NA 值。
  4. 使用 to_numpy() 方法将系列转换回 NumPy 数组。
In [107]: combined
Out[107]: 
array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  0,  1,  1,  1,  0,  0,
        1,  1,  1,  0, -1, -1, -1, -1, -1, -1, -1, -1, -1,  0,  1,  1,  1,
        0,  0,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
        1,  0,  0,  1,  1,  1,  0, -1, -1])

In [108]: import pandas as pd

In [109]: pd.Series(combined).replace(0, pd.NA).fillna(method='ffill').to_numpy()
Out[109]: 
array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,  1,  1,
        1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,
        1,  1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
        1,  1,  1,  1,  1,  1,  1, -1, -1])

这是另一个简单的解决方案:

def gen(arr, start=0):
    y = start
    for x in arr:
        if x != 0:
            y = x
        yield y

g = gen(combined)
# set count for performance
np.fromiter(g, dtype=int, count=combined.size)

>>> array([ 1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,  1,  1,
    1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,  1,
    1,  1,  1,  1,  1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,  1,  1,
    1,  1,  1,  1,  1,  1,  1, -1, -1])

你可以写一个类似的生成器或循环来直接检测跳转:

p = 0
for i, x in enumerate(combined):
    if x - p == 2:
        print(i)
        break
    if x != 0:
        p = x

combined[i-5:i+1]
>>> 12
>>> array([-1, -1, -1, -1,  0,  1])

谢谢大家的回答。

作为记录,以下是所提议解决方案的时间结果:

import numpy as np
import pandas as pd
import ufunclab

arr = np.linspace(-10, 10, 600)
values = np.sin(arr)+0.6*np.sin(arr*3)


def trigger_using_greater(values):
    # This doesn't give correct results
    combined = 1 * (values > 0.6) - 1 * (values < 0)
    return 1 - 2 * np.greater.accumulate(combined)


def trigger_using_masked_indexes(values):
    combined = 1 * (values > 0.6) - 1 * (values < 0)
    mask = combined != 0
    idx = np.where(mask, np.arange(len(mask)), 0)
    idx = np.maximum.accumulate(idx)
    return combined[idx]


def trigger_using_hysteresis_relay(values):
    result = ufunclab.hysteresis_relay(values, 0.0, 0.6, -1, 1, 1).astype(int)
    return result


def trigger_using_pandas(values):
    combined = 1 * (values > 0.6) - 1 * (values < 0)
    result = pd.Series(combined).replace(0, pd.NA).fillna(method='ffill').to_numpy()
    return result

def gen(arr, start=0):
    y = start
    for x in arr:
        if x != 0:
            y = x
        yield y

def trigger_using_generator(values):
    combined = 1 * (values > 0.6) - 1 * (values < 0)
    g = gen(combined)
    return np.fromiter(g, dtype=int, count=combined.size)
In [8]: %timeit trigger_using_greater(values)
21.9 µs ± 1.3 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [9]: %timeit trigger_using_masked_indexes(values)
26.8 µs ± 563 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [10]: %timeit trigger_using_hysteresis_relay(values)
7.31 µs ± 759 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [11]: %timeit trigger_using_pandas(values)
755 µs ± 63.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [12]: %timeit trigger_using_generator(values)
165 µs ± 3.37 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

hysteresis_relay 是明显的赢家,但以编译 ufunclab 包为代价。顺便说一句,非常有用的包。 Warren,考虑将其发布到 PyPI。理想情况下,我希望至少看到其中一些功能集成到 SciPy.

屏蔽索引解决方案几乎和我原来的(不起作用的)解决方案一样快,并且不需要外部库。

Pandas 解决方案出奇地慢,甚至比标准 Python 生成器还要慢。