逐行数组的条件组合
Conditional combination of arrays row by row
任务是根据两个对应向量的乘法结果逐行组合两个数组(构造排列)。如:
Row1_A,
Row2_A,
Row3_A,
Row1_B,
Row2_B,
Row3_B,
结果应该是:Row1_A_Row1_B, Row1_A_Row2_B, Row1_A_Row3_B, Row2_A_Row1_B,等等..
给定以下初始数组:
n_rows = 1000
A = np.random.randint(10, size=(n_rows, 5))
B = np.random.randint(10, size=(n_rows, 5))
P_A = np.random.rand(n_rows, 1)
P_B = np.random.rand(n_rows, 1)
数组 P_A 和 P_B 是包含浮点数的各个数组的对应向量。如果乘法超过某个阈值,组合的行应该只出现在最终数组中,例如:
lim = 0.8
我想到了以下功能或方法来解决这个问题,但我会对更快的解决方案感兴趣。我愿意使用 numba 或其他库,但理想情况下我想使用 numpy 改进矢量化解决方案。
方法一
def concatenate_per_row(A, B):
m1,n1 = A.shape
m2,n2 = B.shape
out = np.zeros((m1,m2,n1+n2),dtype=A.dtype)
out[:,:,:n1] = A[:,None,:]
out[:,:,n1:] = B
return out.reshape(m1*m2,-1)
%%timeit
A_B = concatenate_per_row(A, B)
P_A_B = (P_A[:, None]*P_B[None, :])
P_A_B = P_A_B.flatten()
idx = P_A_B > lim
A_B = A_B[idx, :]
P_A_B = P_A_B[idx]
每个循环 37.8 ms ± 660 µs(7 次运行的平均值 ± 标准偏差,每次 10 次循环)
方法二
%%timeit
A_B = []
P_A_B = []
for i in range(len(P_A)):
P_A_B_i = P_A[i]*P_B
idx = np.where(P_A_B_i > lim)[0]
if len(idx) > 0:
P_A_B.append(P_A_B_i[idx])
A_B_i = np.zeros((len(idx), A.shape[1] + B.shape[1]), dtype='int')
A_B_i[:, :A.shape[1]] = A[i]
A_B_i[:, A.shape[1]:] = B[idx, :]
A_B.append(A_B_i)
A_B = np.concatenate(A_B)
P_A_B = np.concatenate(P_A_B)
每个循环 9.65 毫秒 ± 291 µs(7 次运行的平均值 ± 标准偏差,每次 100 次循环)
首先,有一个更高效的算法。实际上,您可以预先计算输出数组的大小,以便可以将值 直接写入 到最终输出数组中,而不是临时存储在列表中。要有效地找到大小,您可以 排序 数组 P_B
然后进行 二分搜索 以便找到更大的值比 lim/P_A[i,0]
所有可能的 i
(P_B*P_A[i,0] > lim
等同于 P_B > lim/P_A[i,0]
)。每个 i
筛选的项目数可以临时存储,以便快速循环筛选的项目。
此外,您可以使用 Numba 来显着加快主循环的计算速度。
这是结果代码:
@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
assert P_A.shape[1] == 1
assert P_B.shape[1] == 1
P_B_sorted = np.sort(P_B.reshape(P_B.size))
counts = len(P_B) - np.searchsorted(P_B_sorted, lim/P_A[:,0], side='right')
n = np.sum(counts)
mA, mB = A.shape[1], B.shape[1]
m = mA + mB
A_B = np.empty((n, m), dtype=np.int_)
P_A_B = np.empty((n, 1), dtype=np.float64)
k = 0
for i in range(P_A.shape[0]):
if counts[i] > 0:
idx = np.where(P_B > lim/P_A[i, 0])[0]
assert counts[i] == len(idx)
start, end = k, k + counts[i]
A_B[start:end, :mA] = A[i, :]
A_B[start:end, mA:] = B[idx, :]
P_A_B[start:end, :] = P_B[idx, :] * P_A[i, 0]
k += counts[i]
return A_B, P_A_B
以下是我机器上的性能结果:
Original: 35.6 ms
Optimized original: 18.2 ms
Proposed (with order): 0.9 ms
Proposed (no ordering): 0.3 ms
上面提出的算法比原来的优化算法快20倍。它可以做得更快。实际上,如果 项目 的顺序无关紧要,您可以使用 argsort
来重新排序 B
和 P_B
。这使您不必在热循环中每次都计算 idx
和 select 直接计算 B
和 P_B
中的最后一个元素(保证高于阈值但与原始代码的顺序不同)。因为 selected 项目连续存储在内存中,所以这个实现要快得多。最后,最后一个实现比原始优化算法快 60 倍。请注意,即使没有 Numba,提议的实现也比原始实现快得多。
这里是不关心项目顺序的实现:
@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
assert P_A.shape[1] == 1
assert P_B.shape[1] == 1
nA, mA = A.shape
nB, mB = B.shape
m = mA + mB
order = np.argsort(P_B.reshape(nB))
P_B_sorted = P_B[order, :]
B_sorted = B[order, :]
counts = nB - np.searchsorted(P_B_sorted.reshape(nB), lim/P_A[:,0], side='right')
nRes = np.sum(counts)
A_B = np.empty((nRes, m), dtype=np.int_)
P_A_B = np.empty((nRes, 1), dtype=np.float64)
k = 0
for i in range(P_A.shape[0]):
if counts[i] > 0:
start, end = k, k + counts[i]
A_B[start:end, :mA] = A[i, :]
A_B[start:end, mA:] = B_sorted[nB-counts[i]:, :]
P_A_B[start:end, :] = P_B_sorted[nB-counts[i]:, :] * P_A[i, 0]
k += counts[i]
return A_B, P_A_B
任务是根据两个对应向量的乘法结果逐行组合两个数组(构造排列)。如:
Row1_A, Row2_A, Row3_A,
Row1_B, Row2_B, Row3_B,
结果应该是:Row1_A_Row1_B, Row1_A_Row2_B, Row1_A_Row3_B, Row2_A_Row1_B,等等..
给定以下初始数组:
n_rows = 1000
A = np.random.randint(10, size=(n_rows, 5))
B = np.random.randint(10, size=(n_rows, 5))
P_A = np.random.rand(n_rows, 1)
P_B = np.random.rand(n_rows, 1)
数组 P_A 和 P_B 是包含浮点数的各个数组的对应向量。如果乘法超过某个阈值,组合的行应该只出现在最终数组中,例如:
lim = 0.8
我想到了以下功能或方法来解决这个问题,但我会对更快的解决方案感兴趣。我愿意使用 numba 或其他库,但理想情况下我想使用 numpy 改进矢量化解决方案。
方法一
def concatenate_per_row(A, B):
m1,n1 = A.shape
m2,n2 = B.shape
out = np.zeros((m1,m2,n1+n2),dtype=A.dtype)
out[:,:,:n1] = A[:,None,:]
out[:,:,n1:] = B
return out.reshape(m1*m2,-1)
%%timeit
A_B = concatenate_per_row(A, B)
P_A_B = (P_A[:, None]*P_B[None, :])
P_A_B = P_A_B.flatten()
idx = P_A_B > lim
A_B = A_B[idx, :]
P_A_B = P_A_B[idx]
每个循环 37.8 ms ± 660 µs(7 次运行的平均值 ± 标准偏差,每次 10 次循环)
方法二
%%timeit
A_B = []
P_A_B = []
for i in range(len(P_A)):
P_A_B_i = P_A[i]*P_B
idx = np.where(P_A_B_i > lim)[0]
if len(idx) > 0:
P_A_B.append(P_A_B_i[idx])
A_B_i = np.zeros((len(idx), A.shape[1] + B.shape[1]), dtype='int')
A_B_i[:, :A.shape[1]] = A[i]
A_B_i[:, A.shape[1]:] = B[idx, :]
A_B.append(A_B_i)
A_B = np.concatenate(A_B)
P_A_B = np.concatenate(P_A_B)
每个循环 9.65 毫秒 ± 291 µs(7 次运行的平均值 ± 标准偏差,每次 100 次循环)
首先,有一个更高效的算法。实际上,您可以预先计算输出数组的大小,以便可以将值 直接写入 到最终输出数组中,而不是临时存储在列表中。要有效地找到大小,您可以 排序 数组 P_B
然后进行 二分搜索 以便找到更大的值比 lim/P_A[i,0]
所有可能的 i
(P_B*P_A[i,0] > lim
等同于 P_B > lim/P_A[i,0]
)。每个 i
筛选的项目数可以临时存储,以便快速循环筛选的项目。
此外,您可以使用 Numba 来显着加快主循环的计算速度。
这是结果代码:
@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
assert P_A.shape[1] == 1
assert P_B.shape[1] == 1
P_B_sorted = np.sort(P_B.reshape(P_B.size))
counts = len(P_B) - np.searchsorted(P_B_sorted, lim/P_A[:,0], side='right')
n = np.sum(counts)
mA, mB = A.shape[1], B.shape[1]
m = mA + mB
A_B = np.empty((n, m), dtype=np.int_)
P_A_B = np.empty((n, 1), dtype=np.float64)
k = 0
for i in range(P_A.shape[0]):
if counts[i] > 0:
idx = np.where(P_B > lim/P_A[i, 0])[0]
assert counts[i] == len(idx)
start, end = k, k + counts[i]
A_B[start:end, :mA] = A[i, :]
A_B[start:end, mA:] = B[idx, :]
P_A_B[start:end, :] = P_B[idx, :] * P_A[i, 0]
k += counts[i]
return A_B, P_A_B
以下是我机器上的性能结果:
Original: 35.6 ms
Optimized original: 18.2 ms
Proposed (with order): 0.9 ms
Proposed (no ordering): 0.3 ms
上面提出的算法比原来的优化算法快20倍。它可以做得更快。实际上,如果 项目 的顺序无关紧要,您可以使用 argsort
来重新排序 B
和 P_B
。这使您不必在热循环中每次都计算 idx
和 select 直接计算 B
和 P_B
中的最后一个元素(保证高于阈值但与原始代码的顺序不同)。因为 selected 项目连续存储在内存中,所以这个实现要快得多。最后,最后一个实现比原始优化算法快 60 倍。请注意,即使没有 Numba,提议的实现也比原始实现快得多。
这里是不关心项目顺序的实现:
@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
assert P_A.shape[1] == 1
assert P_B.shape[1] == 1
nA, mA = A.shape
nB, mB = B.shape
m = mA + mB
order = np.argsort(P_B.reshape(nB))
P_B_sorted = P_B[order, :]
B_sorted = B[order, :]
counts = nB - np.searchsorted(P_B_sorted.reshape(nB), lim/P_A[:,0], side='right')
nRes = np.sum(counts)
A_B = np.empty((nRes, m), dtype=np.int_)
P_A_B = np.empty((nRes, 1), dtype=np.float64)
k = 0
for i in range(P_A.shape[0]):
if counts[i] > 0:
start, end = k, k + counts[i]
A_B[start:end, :mA] = A[i, :]
A_B[start:end, mA:] = B_sorted[nB-counts[i]:, :]
P_A_B[start:end, :] = P_B_sorted[nB-counts[i]:, :] * P_A[i, 0]
k += counts[i]
return A_B, P_A_B