Numpy 全矢量化元素有序减法
Numpy ordered subtraction of elements with full vectorization
想象一个 mxn
数组 a
和一个 1xn
数组 b
,我们想从 a
中减去 b
以便从 a
的第一个元素中减去 b
,然后从 a[1]
中减去零和 b-a[0]
的最大值,依此类推...
所以:
x = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
a = np.repeat(x, 100000).reshape(10, 100000)
b = np.repeat(np.array([5]), 100000).reshape(1, 100000)
所以我们要得到:[ 0, 0, 1, 4, 5, 6, 7, 8, 9, 10]
,重复10万次。
我已经管理了下面的函数,它提供了期望的结果:
def func(a, b):
n = np.copy(a)
m = np.copy(b)
for i in range(len(n)):
n[i] = np.where(n[i] >= m, n[i] - m, 0)
m = np.maximum(0, m - a[i])
if not m.any():
return n
return n
但是,它没有完全矢量化。所以:
>> timeit func(a, b)
3.23 ms ± 52.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
理想情况下,我想摆脱 for 循环并使其尽可能矢量化。
谢谢。
我认为你可以像这样向量化你的函数:
import numpy as np
def func_vec(a, b):
ar = np.roll(a, 1, axis=0)
ar[0] = 0
ac = np.cumsum(ar, axis=0)
bc = np.maximum(b - ac, 0)
return np.maximum(a - bc, 0)
快速测试:
import numpy as np
def func(a, b):
n = np.copy(a)
m = np.copy(b)
for i in range(len(n)):
n[i] = np.where(n[i] >= m, n[i] - m, 0)
m = np.maximum(0, m - a[i])
if not m.any():
return n
return n
np.random.seed(100)
n = 100000
m = 10
num_max = 100
a = np.random.randint(num_max, size=(m, n))
b = np.random.randint(num_max, size=(1, n))
print(np.all(func(a, b) == func_vec(a, b)))
# True
但是,你的算法比向量化算法有一个重要的优势,就是当它发现没有别的东西可以减去时,它会停止迭代。这意味着,根据问题的大小和具体值(决定提前停止何时发生的因素,如果有的话),矢量化解决方案实际上可能更慢。上面的例子见:
%timeit func(a, b)
# 5.09 ms ± 78.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit func_vec(a, b)
# 12.4 ms ± 939 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
但是,您可以使用 Numba 获得某种 "best of both worlds" 解决方案:
import numpy as np
import numba as nb
@nb.njit
def func_nb(a, b):
n = np.copy(a)
m = np.copy(b)
zero = np.array(0, dtype=a.dtype)
for i in range(len(n)):
n[i] = np.maximum(n[i] - m, zero)
m = np.maximum(zero, m - a[i])
if not m.any():
return n
return n
print(np.all(func(a, b) == func_nb(a, b)))
# True
%timeit func_nb(a, b)
# 3.36 ms ± 461 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
想象一个 mxn
数组 a
和一个 1xn
数组 b
,我们想从 a
中减去 b
以便从 a
的第一个元素中减去 b
,然后从 a[1]
中减去零和 b-a[0]
的最大值,依此类推...
所以:
x = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
a = np.repeat(x, 100000).reshape(10, 100000)
b = np.repeat(np.array([5]), 100000).reshape(1, 100000)
所以我们要得到:[ 0, 0, 1, 4, 5, 6, 7, 8, 9, 10]
,重复10万次。
我已经管理了下面的函数,它提供了期望的结果:
def func(a, b):
n = np.copy(a)
m = np.copy(b)
for i in range(len(n)):
n[i] = np.where(n[i] >= m, n[i] - m, 0)
m = np.maximum(0, m - a[i])
if not m.any():
return n
return n
但是,它没有完全矢量化。所以:
>> timeit func(a, b)
3.23 ms ± 52.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
理想情况下,我想摆脱 for 循环并使其尽可能矢量化。
谢谢。
我认为你可以像这样向量化你的函数:
import numpy as np
def func_vec(a, b):
ar = np.roll(a, 1, axis=0)
ar[0] = 0
ac = np.cumsum(ar, axis=0)
bc = np.maximum(b - ac, 0)
return np.maximum(a - bc, 0)
快速测试:
import numpy as np
def func(a, b):
n = np.copy(a)
m = np.copy(b)
for i in range(len(n)):
n[i] = np.where(n[i] >= m, n[i] - m, 0)
m = np.maximum(0, m - a[i])
if not m.any():
return n
return n
np.random.seed(100)
n = 100000
m = 10
num_max = 100
a = np.random.randint(num_max, size=(m, n))
b = np.random.randint(num_max, size=(1, n))
print(np.all(func(a, b) == func_vec(a, b)))
# True
但是,你的算法比向量化算法有一个重要的优势,就是当它发现没有别的东西可以减去时,它会停止迭代。这意味着,根据问题的大小和具体值(决定提前停止何时发生的因素,如果有的话),矢量化解决方案实际上可能更慢。上面的例子见:
%timeit func(a, b)
# 5.09 ms ± 78.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit func_vec(a, b)
# 12.4 ms ± 939 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
但是,您可以使用 Numba 获得某种 "best of both worlds" 解决方案:
import numpy as np
import numba as nb
@nb.njit
def func_nb(a, b):
n = np.copy(a)
m = np.copy(b)
zero = np.array(0, dtype=a.dtype)
for i in range(len(n)):
n[i] = np.maximum(n[i] - m, zero)
m = np.maximum(zero, m - a[i])
if not m.any():
return n
return n
print(np.all(func(a, b) == func_nb(a, b)))
# True
%timeit func_nb(a, b)
# 3.36 ms ± 461 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)