numpy:累积'greater'操作
numpy: accumulate 'greater' operation
我正在尝试编写一个函数来检测所有上升沿 - 向量中值超过特定阈值的索引。此处描述了类似的内容:,但我想添加滞后,以便触发器不会触发,除非该值低于另一个限制。
我想出了以下代码:
import numpy as np
arr = np.linspace(-10, 10, 60)
sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)
above_trigger = sample_values > 0.6
below_deadband = sample_values < 0.0
combined = 1 * above_trigger - 1 * below_deadband
现在在 combined
数组中有 1
原始值高于上限,-1
低于下限,0
值介于:
>>> combined
array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 1, 1, 0, 0,
1, 1, 1, 0, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 1, 1,
0, 0, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 0, 0, 1, 1, 1, 0, -1, -1])
我的想法是使用一些聪明的函数来顺序处理这个向量,并将所有零序列替换为它们前面的任何非零值。那么问题将归结为简单地找到值从 -1
变为 1
.
的位置
我认为如果正确使用 greater
操作将实现此目的:-1
编码为 True
和 1
编码为 False
:
- (
True
("-1") > -1
) -> True
("-1")
- (
True
("-1") > 1
) -> False
("1")
- (
True
("-1") > 0
) -> True
("-1")
- (
False
("1") > -1
) -> True
("-1")
- (
False
("1") > 1
) -> False
("1")
- (
False
("1") > 0
) -> False
("1")
但是结果不是我所期望的:
>>> 1 - 2 * np.greater.accumulate(combined)
array([-1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1])
在这种情况下,greater
函数似乎没有正确地将布尔值与数值进行比较,即使它在标量或成对上使用时工作正常:
>>> np.greater(False, -1)
True
>>> np.greater.outer(False, combined)
array([False, False, True, True, True, True, True, True, True,
True, True, False, False, False, False, False, False, False,
False, False, False, True, True, True, True, True, True,
True, True, True, False, False, False, False, False, False,
False, False, False, True, True, True, True, True, True,
True, True, True, True, False, False, False, False, False,
False, False, False, False, True, True])
这是预期的行为吗?我是不是做错了什么,有什么办法解决这个问题吗?
或者,也许您可以建议另一种方法来解决这个问题?
谢谢。
我不确定 np.greater.accumulate
的问题是什么(它似乎确实不像宣传的那样表现),但以下应该有效:
import numpy as np
import numpy as np
arr = np.linspace(-10, 10, 60)
sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)
above_trigger = sample_values > 0.6
below_deadband = sample_values < 0.0
combined = 1 * above_trigger - 1 * below_deadband
mask = combined != 0
idx = np.where(mask,np.arange(len(mask)),0)
idx = np.maximum.accumulate(idx)
result = combined[idx]
print(f"combined:\n {combined}\n")
print(f"result:\n {result}")
它给出:
combined:
[ 1 1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 1 1 1 0 0 1 1 1 0 -1 -1 -1
-1 -1 -1 -1 -1 -1 0 1 1 1 0 0 1 1 1 -1 -1 -1 -1 -1 -1 -1 -1 -1
-1 1 1 1 0 0 1 1 1 0 -1 -1]
result:
[ 1 1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 1 1 1 1 1 1 1 1 1 -1 -1 -1
-1 -1 -1 -1 -1 -1 -1 1 1 1 1 1 1 1 1 -1 -1 -1 -1 -1 -1 -1 -1 -1
-1 1 1 1 1 1 1 1 1 1 -1 -1]
那么可以得到从-1跳到1的索引如下:
np.nonzero(result[1:] > result[:-1])[0] + 1
它给出:
array([12, 31, 49])
我一直在开发一个名为 ufunclab
that includes the function hysteresis_relay
的软件包,它可以满足您的需求。我没有把它放在 PyPI 上,所以你必须获取源代码并自己构建才能使用它。
In [122]: import numpy as np
In [123]: from ufunclab import hysteresis_relay
In [124]: arr = np.linspace(-10, 10, 60)
In [125]: sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)
In [126]: hysteresis_relay(sample_values, 0.0, 0.6, -1, 1, 1).astype(int)
Out[126]:
array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,
1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1,
1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 1, 1, 1, 1, 1, 1, -1, -1])
另一种选择是使用Pandas(但我怀疑@bb1 的答案会比这更有效,并且@bb1 的答案避免依赖于另一个库)。
- 将
combined
转换为 Pandas 系列。
- 用系列中的
pd.NA
替换 0。
- 使用方法
fillna()
和 method='ffill'
来“向前填充”NA
值。
- 使用
to_numpy()
方法将系列转换回 NumPy 数组。
In [107]: combined
Out[107]:
array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 1, 1, 0, 0,
1, 1, 1, 0, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 1, 1,
0, 0, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 0, 0, 1, 1, 1, 0, -1, -1])
In [108]: import pandas as pd
In [109]: pd.Series(combined).replace(0, pd.NA).fillna(method='ffill').to_numpy()
Out[109]:
array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,
1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1,
1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 1, 1, 1, 1, 1, 1, -1, -1])
这是另一个简单的解决方案:
def gen(arr, start=0):
y = start
for x in arr:
if x != 0:
y = x
yield y
g = gen(combined)
# set count for performance
np.fromiter(g, dtype=int, count=combined.size)
>>> array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,
1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1,
1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 1, 1, 1, 1, 1, 1, -1, -1])
你可以写一个类似的生成器或循环来直接检测跳转:
p = 0
for i, x in enumerate(combined):
if x - p == 2:
print(i)
break
if x != 0:
p = x
combined[i-5:i+1]
>>> 12
>>> array([-1, -1, -1, -1, 0, 1])
谢谢大家的回答。
作为记录,以下是所提议解决方案的时间结果:
import numpy as np
import pandas as pd
import ufunclab
arr = np.linspace(-10, 10, 600)
values = np.sin(arr)+0.6*np.sin(arr*3)
def trigger_using_greater(values):
# This doesn't give correct results
combined = 1 * (values > 0.6) - 1 * (values < 0)
return 1 - 2 * np.greater.accumulate(combined)
def trigger_using_masked_indexes(values):
combined = 1 * (values > 0.6) - 1 * (values < 0)
mask = combined != 0
idx = np.where(mask, np.arange(len(mask)), 0)
idx = np.maximum.accumulate(idx)
return combined[idx]
def trigger_using_hysteresis_relay(values):
result = ufunclab.hysteresis_relay(values, 0.0, 0.6, -1, 1, 1).astype(int)
return result
def trigger_using_pandas(values):
combined = 1 * (values > 0.6) - 1 * (values < 0)
result = pd.Series(combined).replace(0, pd.NA).fillna(method='ffill').to_numpy()
return result
def gen(arr, start=0):
y = start
for x in arr:
if x != 0:
y = x
yield y
def trigger_using_generator(values):
combined = 1 * (values > 0.6) - 1 * (values < 0)
g = gen(combined)
return np.fromiter(g, dtype=int, count=combined.size)
In [8]: %timeit trigger_using_greater(values)
21.9 µs ± 1.3 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [9]: %timeit trigger_using_masked_indexes(values)
26.8 µs ± 563 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [10]: %timeit trigger_using_hysteresis_relay(values)
7.31 µs ± 759 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
In [11]: %timeit trigger_using_pandas(values)
755 µs ± 63.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [12]: %timeit trigger_using_generator(values)
165 µs ± 3.37 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
hysteresis_relay
是明显的赢家,但以编译 ufunclab
包为代价。顺便说一句,非常有用的包。 Warren,考虑将其发布到 PyPI。理想情况下,我希望至少看到其中一些功能集成到 SciPy.
中
屏蔽索引解决方案几乎和我原来的(不起作用的)解决方案一样快,并且不需要外部库。
Pandas 解决方案出奇地慢,甚至比标准 Python 生成器还要慢。
我正在尝试编写一个函数来检测所有上升沿 - 向量中值超过特定阈值的索引。此处描述了类似的内容:
我想出了以下代码:
import numpy as np
arr = np.linspace(-10, 10, 60)
sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)
above_trigger = sample_values > 0.6
below_deadband = sample_values < 0.0
combined = 1 * above_trigger - 1 * below_deadband
现在在 combined
数组中有 1
原始值高于上限,-1
低于下限,0
值介于:
>>> combined
array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 1, 1, 0, 0,
1, 1, 1, 0, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 1, 1,
0, 0, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 0, 0, 1, 1, 1, 0, -1, -1])
我的想法是使用一些聪明的函数来顺序处理这个向量,并将所有零序列替换为它们前面的任何非零值。那么问题将归结为简单地找到值从 -1
变为 1
.
我认为如果正确使用 greater
操作将实现此目的:-1
编码为 True
和 1
编码为 False
:
- (
True
("-1") >-1
) ->True
("-1") - (
True
("-1") >1
) ->False
("1") - (
True
("-1") >0
) ->True
("-1") - (
False
("1") >-1
) ->True
("-1") - (
False
("1") >1
) ->False
("1") - (
False
("1") >0
) ->False
("1")
但是结果不是我所期望的:
>>> 1 - 2 * np.greater.accumulate(combined)
array([-1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1])
在这种情况下,greater
函数似乎没有正确地将布尔值与数值进行比较,即使它在标量或成对上使用时工作正常:
>>> np.greater(False, -1)
True
>>> np.greater.outer(False, combined)
array([False, False, True, True, True, True, True, True, True,
True, True, False, False, False, False, False, False, False,
False, False, False, True, True, True, True, True, True,
True, True, True, False, False, False, False, False, False,
False, False, False, True, True, True, True, True, True,
True, True, True, True, False, False, False, False, False,
False, False, False, False, True, True])
这是预期的行为吗?我是不是做错了什么,有什么办法解决这个问题吗?
或者,也许您可以建议另一种方法来解决这个问题?
谢谢。
我不确定 np.greater.accumulate
的问题是什么(它似乎确实不像宣传的那样表现),但以下应该有效:
import numpy as np
import numpy as np
arr = np.linspace(-10, 10, 60)
sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)
above_trigger = sample_values > 0.6
below_deadband = sample_values < 0.0
combined = 1 * above_trigger - 1 * below_deadband
mask = combined != 0
idx = np.where(mask,np.arange(len(mask)),0)
idx = np.maximum.accumulate(idx)
result = combined[idx]
print(f"combined:\n {combined}\n")
print(f"result:\n {result}")
它给出:
combined:
[ 1 1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0 1 1 1 0 0 1 1 1 0 -1 -1 -1
-1 -1 -1 -1 -1 -1 0 1 1 1 0 0 1 1 1 -1 -1 -1 -1 -1 -1 -1 -1 -1
-1 1 1 1 0 0 1 1 1 0 -1 -1]
result:
[ 1 1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 1 1 1 1 1 1 1 1 1 -1 -1 -1
-1 -1 -1 -1 -1 -1 -1 1 1 1 1 1 1 1 1 -1 -1 -1 -1 -1 -1 -1 -1 -1
-1 1 1 1 1 1 1 1 1 1 -1 -1]
那么可以得到从-1跳到1的索引如下:
np.nonzero(result[1:] > result[:-1])[0] + 1
它给出:
array([12, 31, 49])
我一直在开发一个名为 ufunclab
that includes the function hysteresis_relay
的软件包,它可以满足您的需求。我没有把它放在 PyPI 上,所以你必须获取源代码并自己构建才能使用它。
In [122]: import numpy as np
In [123]: from ufunclab import hysteresis_relay
In [124]: arr = np.linspace(-10, 10, 60)
In [125]: sample_values = np.sin(arr) + 0.6 * np.sin(arr*3)
In [126]: hysteresis_relay(sample_values, 0.0, 0.6, -1, 1, 1).astype(int)
Out[126]:
array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,
1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1,
1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 1, 1, 1, 1, 1, 1, -1, -1])
另一种选择是使用Pandas(但我怀疑@bb1 的答案会比这更有效,并且@bb1 的答案避免依赖于另一个库)。
- 将
combined
转换为 Pandas 系列。 - 用系列中的
pd.NA
替换 0。 - 使用方法
fillna()
和method='ffill'
来“向前填充”NA
值。 - 使用
to_numpy()
方法将系列转换回 NumPy 数组。
In [107]: combined
Out[107]:
array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 1, 1, 0, 0,
1, 1, 1, 0, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 1, 1,
0, 0, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 0, 0, 1, 1, 1, 0, -1, -1])
In [108]: import pandas as pd
In [109]: pd.Series(combined).replace(0, pd.NA).fillna(method='ffill').to_numpy()
Out[109]:
array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,
1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1,
1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 1, 1, 1, 1, 1, 1, -1, -1])
这是另一个简单的解决方案:
def gen(arr, start=0):
y = start
for x in arr:
if x != 0:
y = x
yield y
g = gen(combined)
# set count for performance
np.fromiter(g, dtype=int, count=combined.size)
>>> array([ 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1,
1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1,
1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1,
1, 1, 1, 1, 1, 1, 1, -1, -1])
你可以写一个类似的生成器或循环来直接检测跳转:
p = 0
for i, x in enumerate(combined):
if x - p == 2:
print(i)
break
if x != 0:
p = x
combined[i-5:i+1]
>>> 12
>>> array([-1, -1, -1, -1, 0, 1])
谢谢大家的回答。
作为记录,以下是所提议解决方案的时间结果:
import numpy as np
import pandas as pd
import ufunclab
arr = np.linspace(-10, 10, 600)
values = np.sin(arr)+0.6*np.sin(arr*3)
def trigger_using_greater(values):
# This doesn't give correct results
combined = 1 * (values > 0.6) - 1 * (values < 0)
return 1 - 2 * np.greater.accumulate(combined)
def trigger_using_masked_indexes(values):
combined = 1 * (values > 0.6) - 1 * (values < 0)
mask = combined != 0
idx = np.where(mask, np.arange(len(mask)), 0)
idx = np.maximum.accumulate(idx)
return combined[idx]
def trigger_using_hysteresis_relay(values):
result = ufunclab.hysteresis_relay(values, 0.0, 0.6, -1, 1, 1).astype(int)
return result
def trigger_using_pandas(values):
combined = 1 * (values > 0.6) - 1 * (values < 0)
result = pd.Series(combined).replace(0, pd.NA).fillna(method='ffill').to_numpy()
return result
def gen(arr, start=0):
y = start
for x in arr:
if x != 0:
y = x
yield y
def trigger_using_generator(values):
combined = 1 * (values > 0.6) - 1 * (values < 0)
g = gen(combined)
return np.fromiter(g, dtype=int, count=combined.size)
In [8]: %timeit trigger_using_greater(values)
21.9 µs ± 1.3 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [9]: %timeit trigger_using_masked_indexes(values)
26.8 µs ± 563 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
In [10]: %timeit trigger_using_hysteresis_relay(values)
7.31 µs ± 759 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
In [11]: %timeit trigger_using_pandas(values)
755 µs ± 63.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
In [12]: %timeit trigger_using_generator(values)
165 µs ± 3.37 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
hysteresis_relay
是明显的赢家,但以编译 ufunclab
包为代价。顺便说一句,非常有用的包。 Warren,考虑将其发布到 PyPI。理想情况下,我希望至少看到其中一些功能集成到 SciPy.
屏蔽索引解决方案几乎和我原来的(不起作用的)解决方案一样快,并且不需要外部库。
Pandas 解决方案出奇地慢,甚至比标准 Python 生成器还要慢。