Python Numpy 向量化组合学的嵌套 for 循环
Python Numpy vectorize nested for-loops for combinatorics
给定一个正实数的 nxn 数组 A,我试图找到二维数组三行的所有组合的元素最小值的最大值中的最小值。使用 for 循环,结果是这样的:
import numpy as np
n = 100
np.random.seed(2)
A = np.random.rand(n,n)
global_best = np.inf
for i in range(n-2):
for j in range(i+1, n-1):
for k in range(j+1, n):
# find the maximum of the element-wise minimum of the three vectors
local_best = np.amax(np.array([A[i,:], A[j,:], A[k,:]]).min(0))
# if local_best is lower than global_best, update global_best
if (local_best < global_best):
global_best = local_best
save_rows = [i, j, k]
print global_best, save_rows
在 n = 100
的情况下,输出应该是这样的:
Out[]: 0.492652949593 [6, 41, 58]
虽然我有一种感觉,但我可以使用 Numpy 向量化更快地完成此操作,并且当然会感谢任何帮助。谢谢
您可以使用 itertools
中的 combinations,这是一个 python 标准库,它将帮助您删除所有那些嵌套循环。
from itertools import combinations
import numpy as np
n = 100
np.random.seed(2)
A = np.random.rand(n,n)
global_best = 1000000000000000.0
for i, j, k in combinations(range(n), 3):
local_best = np.amax(np.array([A[i,:], A[j,:], A[k,:]]).min(0))
if local_best < global_best:
global_best = local_best
save_rows = [i, j, k]
print global_best, save_rows
对于 n=100
:
,此解决方案快 5 倍
coms = np.fromiter(itertools.combinations(np.arange(n), 3), 'i,i,i').view(('i', 3))
best = A[coms].min(1).max(1)
at = best.argmin()
global_best = best[at]
save_rows = coms[at]
第一行有点复杂,但将 itertools.combinations
的结果转换为包含所有可能的 [i,j,k]
索引组合的 NumPy 数组。
从那里开始,只需使用所有可能的索引组合对 A
进行索引,然后沿适当的轴进行缩减即可。
此解决方案在构建所有可能组合的具体数组时会消耗更多内存 A[coms]
。它为小 n
节省时间,比如 250 以下,但对于大 n
内存流量会非常高,并且可能比原始代码慢。
按块工作可以结合矢量化微积分的速度,同时避免 运行 进入内存错误。下面是一个将嵌套循环转换为按块矢量化的示例。
从与问题相同的变量开始,定义块长度,以便向量化块内的计算并仅在块而不是组合上循环。
chunk = 2000 # define chunk length, if to small, the code won't take advantage
# of vectorization, if it is too large, excessive memory usage will
# slow down execution, or Memory Error will be risen
combinations = itertools.combinations(range(n),3) # generate iterator containing
# all possible combinations of 3 columns
N = n*(n-1)*(n-2)//6 # number of combinations (length of combinations cannot be
# retrieved because it is an iterator)
# generate a list containing how many elements of combinations will be retrieved
# per iteration
n_chunks, remainder = divmod(N,chunk)
counts_list = [chunk for _ in range(n_chunks)]
if remainder:
counts_list.append(remainder)
# Iterate one chunk at a time, using vectorized code to treat the chunk
for counts in counts_list:
# retrieve combinations in current chunk
current_comb = np.fromiter(combinations,dtype='i,i,i',count=counts)\
.view(('i',3))
# maximum of element-wise minimum in current chunk
chunk_best = np.minimum(np.minimum(A[current_comb[:,0],:],A[current_comb[:,1],:]),
A[current_comb[:,2],:]).max(axis=1)
ravel_save_row = chunk_best.argmin() # minimum of maximums in current chunk
# check if current chunk contains global minimum
if chunk_best[ravel_save_row] < global_best:
global_best = chunk_best[ravel_save_row]
save_rows = current_comb[ravel_save_row]
print(global_best,save_rows)
我运行与嵌套循环进行了一些性能比较,得到如下结果(chunk_length = 1000):
- n=100
- 嵌套循环:1.13 秒 ± 16.6 毫秒
- 分块工作:108 毫秒 ± 565 微秒
- n=150
- 嵌套循环:4.16 秒 ± 39.3 毫秒
- 分块工作:523 毫秒 ± 4.75 毫秒
- n=500
- 嵌套循环:3min 18s ± 3.21 s
- 分块工作:1 分 12 秒 ± 1.6 秒
备注
分析代码后,我发现 np.min
花费的时间最长 calling np.maximum.reduce
。我将它直接转换为 np.maximum
,这稍微提高了性能。
不要尝试对不容易向量化的循环进行向量化。而是使用像 Numba 这样的 jit 编译器或使用 Cython。如果生成的代码更具可读性,则矢量化解决方案是好的,但就性能而言,编译的解决方案通常更快,或者在最坏的情况下与矢量化解决方案一样快(BLAS 例程除外)。
单线程示例
import numba as nb
import numpy as np
#Min and max library calls may be costly for only 3 values
@nb.njit()
def max_min_3(A,B,C):
max_of_min=-np.inf
for i in range(A.shape[0]):
loc_min=A[i]
if (B[i]<loc_min):
loc_min=B[i]
if (C[i]<loc_min):
loc_min=C[i]
if (max_of_min<loc_min):
max_of_min=loc_min
return max_of_min
@nb.njit()
def your_func(A):
n=A.shape[0]
save_rows=np.zeros(3,dtype=np.uint64)
global_best=np.inf
for i in range(n):
for j in range(i+1, n):
for k in range(j+1, n):
# find the maximum of the element-wise minimum of the three vectors
local_best = max_min_3(A[i,:], A[j,:], A[k,:])
# if local_best is lower than global_best, update global_best
if (local_best < global_best):
global_best = local_best
save_rows[0] = i
save_rows[1] = j
save_rows[2] = k
return global_best, save_rows
单线程版本的性能
n=100
your_version: 1.56s
compiled_version: 0.0168s (92x speedup)
n=150
your_version: 5.41s
compiled_version: 0.08122s (66x speedup)
n=500
your_version: 283s
compiled_version: 8.86s (31x speedup)
第一次调用有一个恒定的开销大约0.3-1s。对于计算时间本身的性能测量,调用一次,然后测量性能。
通过一些代码更改,此任务也可以并行化。
多线程示例
@nb.njit(parallel=True)
def your_func(A):
n=A.shape[0]
all_global_best=np.inf
rows=np.empty((3),dtype=np.uint64)
save_rows=np.empty((n,3),dtype=np.uint64)
global_best_Temp=np.empty((n),dtype=A.dtype)
global_best_Temp[:]=np.inf
for i in range(n):
for j in nb.prange(i+1, n):
row_1=0
row_2=0
row_3=0
global_best=np.inf
for k in range(j+1, n):
# find the maximum of the element-wise minimum of the three vectors
local_best = max_min_3(A[i,:], A[j,:], A[k,:])
# if local_best is lower than global_best, update global_best
if (local_best < global_best):
global_best = local_best
row_1 = i
row_2 = j
row_3 = k
save_rows[j,0]=row_1
save_rows[j,1]=row_2
save_rows[j,2]=row_3
global_best_Temp[j]=global_best
ind=np.argmin(global_best_Temp)
if (global_best_Temp[ind]<all_global_best):
rows[0] = save_rows[ind,0]
rows[1] = save_rows[ind,1]
rows[2] = save_rows[ind,2]
all_global_best=global_best_Temp[ind]
return all_global_best, rows
多线程版本的性能
n=100
your_version: 1.56s
compiled_version: 0.0078s (200x speedup)
n=150
your_version: 5.41s
compiled_version: 0.0282s (191x speedup)
n=500
your_version: 283s
compiled_version: 2.95s (96x speedup)
编辑
在较新的 Numba 版本(通过 Anaconda Python 发行版安装)中,我必须手动安装 tbb
才能获得工作并行化。
给定一个正实数的 nxn 数组 A,我试图找到二维数组三行的所有组合的元素最小值的最大值中的最小值。使用 for 循环,结果是这样的:
import numpy as np
n = 100
np.random.seed(2)
A = np.random.rand(n,n)
global_best = np.inf
for i in range(n-2):
for j in range(i+1, n-1):
for k in range(j+1, n):
# find the maximum of the element-wise minimum of the three vectors
local_best = np.amax(np.array([A[i,:], A[j,:], A[k,:]]).min(0))
# if local_best is lower than global_best, update global_best
if (local_best < global_best):
global_best = local_best
save_rows = [i, j, k]
print global_best, save_rows
在 n = 100
的情况下,输出应该是这样的:
Out[]: 0.492652949593 [6, 41, 58]
虽然我有一种感觉,但我可以使用 Numpy 向量化更快地完成此操作,并且当然会感谢任何帮助。谢谢
您可以使用 itertools
中的 combinations,这是一个 python 标准库,它将帮助您删除所有那些嵌套循环。
from itertools import combinations
import numpy as np
n = 100
np.random.seed(2)
A = np.random.rand(n,n)
global_best = 1000000000000000.0
for i, j, k in combinations(range(n), 3):
local_best = np.amax(np.array([A[i,:], A[j,:], A[k,:]]).min(0))
if local_best < global_best:
global_best = local_best
save_rows = [i, j, k]
print global_best, save_rows
对于 n=100
:
coms = np.fromiter(itertools.combinations(np.arange(n), 3), 'i,i,i').view(('i', 3))
best = A[coms].min(1).max(1)
at = best.argmin()
global_best = best[at]
save_rows = coms[at]
第一行有点复杂,但将 itertools.combinations
的结果转换为包含所有可能的 [i,j,k]
索引组合的 NumPy 数组。
从那里开始,只需使用所有可能的索引组合对 A
进行索引,然后沿适当的轴进行缩减即可。
此解决方案在构建所有可能组合的具体数组时会消耗更多内存 A[coms]
。它为小 n
节省时间,比如 250 以下,但对于大 n
内存流量会非常高,并且可能比原始代码慢。
按块工作可以结合矢量化微积分的速度,同时避免 运行 进入内存错误。下面是一个将嵌套循环转换为按块矢量化的示例。
从与问题相同的变量开始,定义块长度,以便向量化块内的计算并仅在块而不是组合上循环。
chunk = 2000 # define chunk length, if to small, the code won't take advantage
# of vectorization, if it is too large, excessive memory usage will
# slow down execution, or Memory Error will be risen
combinations = itertools.combinations(range(n),3) # generate iterator containing
# all possible combinations of 3 columns
N = n*(n-1)*(n-2)//6 # number of combinations (length of combinations cannot be
# retrieved because it is an iterator)
# generate a list containing how many elements of combinations will be retrieved
# per iteration
n_chunks, remainder = divmod(N,chunk)
counts_list = [chunk for _ in range(n_chunks)]
if remainder:
counts_list.append(remainder)
# Iterate one chunk at a time, using vectorized code to treat the chunk
for counts in counts_list:
# retrieve combinations in current chunk
current_comb = np.fromiter(combinations,dtype='i,i,i',count=counts)\
.view(('i',3))
# maximum of element-wise minimum in current chunk
chunk_best = np.minimum(np.minimum(A[current_comb[:,0],:],A[current_comb[:,1],:]),
A[current_comb[:,2],:]).max(axis=1)
ravel_save_row = chunk_best.argmin() # minimum of maximums in current chunk
# check if current chunk contains global minimum
if chunk_best[ravel_save_row] < global_best:
global_best = chunk_best[ravel_save_row]
save_rows = current_comb[ravel_save_row]
print(global_best,save_rows)
我运行与嵌套循环进行了一些性能比较,得到如下结果(chunk_length = 1000):
- n=100
- 嵌套循环:1.13 秒 ± 16.6 毫秒
- 分块工作:108 毫秒 ± 565 微秒
- n=150
- 嵌套循环:4.16 秒 ± 39.3 毫秒
- 分块工作:523 毫秒 ± 4.75 毫秒
- n=500
- 嵌套循环:3min 18s ± 3.21 s
- 分块工作:1 分 12 秒 ± 1.6 秒
备注
分析代码后,我发现 np.min
花费的时间最长 calling np.maximum.reduce
。我将它直接转换为 np.maximum
,这稍微提高了性能。
不要尝试对不容易向量化的循环进行向量化。而是使用像 Numba 这样的 jit 编译器或使用 Cython。如果生成的代码更具可读性,则矢量化解决方案是好的,但就性能而言,编译的解决方案通常更快,或者在最坏的情况下与矢量化解决方案一样快(BLAS 例程除外)。
单线程示例
import numba as nb
import numpy as np
#Min and max library calls may be costly for only 3 values
@nb.njit()
def max_min_3(A,B,C):
max_of_min=-np.inf
for i in range(A.shape[0]):
loc_min=A[i]
if (B[i]<loc_min):
loc_min=B[i]
if (C[i]<loc_min):
loc_min=C[i]
if (max_of_min<loc_min):
max_of_min=loc_min
return max_of_min
@nb.njit()
def your_func(A):
n=A.shape[0]
save_rows=np.zeros(3,dtype=np.uint64)
global_best=np.inf
for i in range(n):
for j in range(i+1, n):
for k in range(j+1, n):
# find the maximum of the element-wise minimum of the three vectors
local_best = max_min_3(A[i,:], A[j,:], A[k,:])
# if local_best is lower than global_best, update global_best
if (local_best < global_best):
global_best = local_best
save_rows[0] = i
save_rows[1] = j
save_rows[2] = k
return global_best, save_rows
单线程版本的性能
n=100
your_version: 1.56s
compiled_version: 0.0168s (92x speedup)
n=150
your_version: 5.41s
compiled_version: 0.08122s (66x speedup)
n=500
your_version: 283s
compiled_version: 8.86s (31x speedup)
第一次调用有一个恒定的开销大约0.3-1s。对于计算时间本身的性能测量,调用一次,然后测量性能。
通过一些代码更改,此任务也可以并行化。
多线程示例
@nb.njit(parallel=True)
def your_func(A):
n=A.shape[0]
all_global_best=np.inf
rows=np.empty((3),dtype=np.uint64)
save_rows=np.empty((n,3),dtype=np.uint64)
global_best_Temp=np.empty((n),dtype=A.dtype)
global_best_Temp[:]=np.inf
for i in range(n):
for j in nb.prange(i+1, n):
row_1=0
row_2=0
row_3=0
global_best=np.inf
for k in range(j+1, n):
# find the maximum of the element-wise minimum of the three vectors
local_best = max_min_3(A[i,:], A[j,:], A[k,:])
# if local_best is lower than global_best, update global_best
if (local_best < global_best):
global_best = local_best
row_1 = i
row_2 = j
row_3 = k
save_rows[j,0]=row_1
save_rows[j,1]=row_2
save_rows[j,2]=row_3
global_best_Temp[j]=global_best
ind=np.argmin(global_best_Temp)
if (global_best_Temp[ind]<all_global_best):
rows[0] = save_rows[ind,0]
rows[1] = save_rows[ind,1]
rows[2] = save_rows[ind,2]
all_global_best=global_best_Temp[ind]
return all_global_best, rows
多线程版本的性能
n=100
your_version: 1.56s
compiled_version: 0.0078s (200x speedup)
n=150
your_version: 5.41s
compiled_version: 0.0282s (191x speedup)
n=500
your_version: 283s
compiled_version: 2.95s (96x speedup)
编辑
在较新的 Numba 版本(通过 Anaconda Python 发行版安装)中,我必须手动安装 tbb
才能获得工作并行化。