在没有循环的情况下计数 "greater" 次出现

Count "greater" occurrences without loop

假设我们有一个数组 A 如果形状 (100,)B 形状 (10,)。两者都包含 [0,1].

中的值

我们如何让 A 中的元素数大于 B 中的每个值?我期望形状为 (10,),其中第一个元素是“A 中有多少大于 B[0]”,第二个元素是“A 中有多少大于B[1]", 等...

不使用循环。

我尝试了以下方法,但没有用:

import numpy as np
import numpy.random as rdm

A = rdm.rand(100)
B = np.linspace(0,1,10)
def occ(z: float) ->float:
     return np.count_nonzero(A > z)
occ(B)

Python 不会将我的函数用作 B 上的标量函数,这就是为什么我得到:

operands could not be broadcast together with shapes (10,) (100,) 

我也试过 np.greater 但我遇到了同样的问题...

缓慢但简单

如果您不理解错误消息,则它是含糊不清的,但它会告诉您该怎么做。数组维度 broadcast 在一起,从右边缘开始排列它们。如果您将操作分为两部分,这将特别有用:

  1. 创建一个 (100, 10) 掩码,显示 A 的哪些元素大于 B 的哪些元素:

     mask = A[:, None] > B
    
  2. 将前面运算的结果沿A对应的轴求和:

     result = np.count_nonzero(mask, axis=0)
    

     result = np.sum(mask, axis=0)
    

这可以写成 one-liner:

(A[:, None] > B).sum(0)

np.count_nonzero(A[:, None] > B, axis=0)

您可以切换尺寸并将 B 放在第一个轴上以获得相同的结果:

(A > B[:, None]).sum(1)

快速而优雅

采用完全不同(但可能更有效)的方法,您可以使用 np.searchsorted:

A.sort()
result = A.size - np.searchsorted(A, B)

默认情况下,searchsorted returns left-index 即 B 的每个元素都会被插入到 A 处。这几乎可以立即告诉您 A 中有多少元素大于该值。

基准测试

此处,算法标记如下:

  • B0: (A[:, None] > B).sum(0)
  • B1: (A > B[:, None]).sum(1)
  • HH: np.cumsum(np.histogram(A, bins=B)[0][::-1])[::-1]
  • SS: A.sort(); A.size - np.searchsorted(A, B)
+--------+--------+----------------------------------------+
| A.size | B.size |        Time (B0 / B1 / HH / SS)        |
+--------+--------+----------------------------------------+
|    100 |     10 |  20.9 µs / 15.7 µs / 68.3 µs / 8.87 µs |
+--------+--------+----------------------------------------+
|   1000 |     10 |   118 µs / 57.2 µs /  139 µs / 17.8 µs |
+--------+--------+----------------------------------------+
|  10000 |     10 |   987 µs /  288 µs / 1.23 ms /  131 µs |
+--------+--------+----------------------------------------+
| 100000 |     10 |  9.48 ms / 2.77 ms / 13.4 ms / 1.42 ms |
+--------+--------+----------------------------------------+
|    100 |    100 |  70.7 µs / 63.8 µs /   71 µs / 11.4 µs |
+--------+--------+----------------------------------------+
|   1000 |    100 |   518 µs /  388 µs /  148 µs / 21.6 µs |
+--------+--------+----------------------------------------+
|  10000 |    100 |  4.91 ms / 2.67 ms / 1.22 ms /  137 µs |
+--------+--------+----------------------------------------+
| 100000 |    100 |  57.4 ms / 35.6 ms / 13.5 ms / 1.42 ms |
+--------+--------+----------------------------------------+

内存布局很重要。 B1 总是比 B0 快。发生这种情况是因为对连续(缓存的)元素(沿着 C-order 中的最后一个轴)求和总是比必须跳过行来获取下一个元素更快。对于 B 的小值,广播表现良好。请记住,B0B1 的时间和 space 复杂度都是 O(A.size * B.size)。两个直方图解决方案的复杂度应该是 O(A.size * log(A.size)),但是 SSHH 更有效地实现,因为它可以假设更多关于数据的事情。

我认为你可以使用 np.histogram 来完成这项工作

A = rdm.rand(100)
B = np.linspace(0,1,10)
np.histogram(A, bins=B)[0]

给出输出

array([10,  9,  8, 11,  9, 14, 10, 12, 17])

B[9] 将始终为空,因为没有大于 1 的值。

并向后计算 cumsum

np.cumsum(np.histogram(A, bins=B)[0][::-1])[::-1]

输出

array([100,  90,  81,  73,  62,  53,  39,  29,  17])
np.sum(A>B.reshape((-1,1)), axis=1)

说明

需要理解 broadcasting 并为此进行整形。通过将 B 重塑为形状 (len(B), 1),它可以与 A 一起广播以生成包含所有比较的形状为 (len(B), len(A)) 的数组。然后对轴 1(沿 A)求和。

换句话说,A < B 不起作用,因为 A 有 100 个条目,而 B 有 10 个。如果你阅读 broadcasting rules,你会看到 numpy 将以最后一个维度,如果它们大小相同,那么它可以比较one-to-one。如果两者之一是 1,则此维度被 拉伸或“复制”以匹配另一个 。如果它们不相等且其中 none 等于 1,则失败。

举个更短的例子:

A = np.array([0.5112744 , 0.21696187, 0.14710105, 0.98581087, 0.50053359,
              0.54954654, 0.81217522, 0.50009166, 0.42990167, 0.56078499])
B = array([0.25, 0.5 , 0.75])

(A>B.reshape((-1,1))) 的转置(为了便于阅读)

np.array([[ True,  True, False],
          [False, False, False],
          [False, False, False],
          [ True,  True,  True],
          [ True,  True, False],
          [ True,  True, False],
          [ True,  True,  True],
          [ True,  True, False],
          [ True, False, False],
          [ True,  True, False]])

并且np.sum(A>B.reshape((-1,1)), axis=1)

array([8, 7, 2])