逐行数组的条件组合

Conditional combination of arrays row by row

任务是根据两个对应向量的乘法结果逐行组合两个数组(构造排列)。如:

Row1_A, Row2_A, Row3_A,

Row1_B, Row2_B, Row3_B,

结果应该是:Row1_A_Row1_B, Row1_A_Row2_B, Row1_A_Row3_B, Row2_A_Row1_B,等等..

给定以下初始数组:

n_rows = 1000
A = np.random.randint(10, size=(n_rows, 5))
B = np.random.randint(10, size=(n_rows, 5))
P_A = np.random.rand(n_rows, 1)
P_B = np.random.rand(n_rows, 1)

数组 P_A 和 P_B 是包含浮点数的各个数组的对应向量。如果乘法超过某个阈值,组合的行应该只出现在最终数组中,例如:

lim = 0.8

我想到了以下功能或方法来解决这个问题,但我会对更快的解决方案感兴趣。我愿意使用 numba 或其他库,但理想情况下我想使用 numpy 改进矢量化解决方案。

方法一

def concatenate_per_row(A, B):
    m1,n1 = A.shape
    m2,n2 = B.shape

    out = np.zeros((m1,m2,n1+n2),dtype=A.dtype)
    out[:,:,:n1] = A[:,None,:]
    out[:,:,n1:] = B
    return out.reshape(m1*m2,-1)

%%timeit

A_B = concatenate_per_row(A, B)
P_A_B = (P_A[:, None]*P_B[None, :])
P_A_B = P_A_B.flatten()

idx = P_A_B > lim

A_B = A_B[idx, :]
P_A_B = P_A_B[idx]

每个循环 37.8 ms ± 660 µs(7 次运行的平均值 ± 标准偏差,每次 10 次循环)

方法二

%%timeit

A_B = []
P_A_B = []

for i in range(len(P_A)):
    P_A_B_i = P_A[i]*P_B
    idx = np.where(P_A_B_i > lim)[0]
    
    if len(idx) > 0:
        P_A_B.append(P_A_B_i[idx])

        A_B_i = np.zeros((len(idx), A.shape[1] + B.shape[1]), dtype='int')
        A_B_i[:, :A.shape[1]] = A[i]
        A_B_i[:, A.shape[1]:] = B[idx, :]
        A_B.append(A_B_i)

A_B = np.concatenate(A_B)
P_A_B = np.concatenate(P_A_B)

每个循环 9.65 毫秒 ± 291 µs(7 次运行的平均值 ± 标准偏差,每次 100 次循环)

首先,有一个更高效的算法。实际上,您可以预先计算输出数组的大小,以便可以将值 直接写入 到最终输出数组中,而不是临时存储在列表中。要有效地找到大小,您可以 排序 数组 P_B 然后进行 二分搜索 以便找到更大的值比 lim/P_A[i,0] 所有可能的 iP_B*P_A[i,0] > lim 等同于 P_B > lim/P_A[i,0])。每个 i 筛选的项目数可以临时存储,以便快速循环筛选的项目。

此外,您可以使用 Numba 来显着加快主循环的计算速度。

这是结果代码:

@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
    assert P_A.shape[1] == 1
    assert P_B.shape[1] == 1

    P_B_sorted = np.sort(P_B.reshape(P_B.size))
    counts = len(P_B) - np.searchsorted(P_B_sorted, lim/P_A[:,0], side='right')
    n = np.sum(counts)
    mA, mB = A.shape[1], B.shape[1]
    m = mA + mB

    A_B = np.empty((n, m), dtype=np.int_)
    P_A_B = np.empty((n, 1), dtype=np.float64)
    k = 0

    for i in range(P_A.shape[0]):
        if counts[i] > 0:
            idx = np.where(P_B > lim/P_A[i, 0])[0]
            assert counts[i] == len(idx)

            start, end = k, k + counts[i]
            A_B[start:end, :mA] = A[i, :]
            A_B[start:end, mA:] = B[idx, :]
            P_A_B[start:end, :] = P_B[idx, :] * P_A[i, 0]
            k += counts[i]

    return A_B, P_A_B

以下是我机器上的性能结果:

Original:                35.6 ms
Optimized original:      18.2 ms
Proposed (with order):    0.9 ms
Proposed (no ordering):   0.3 ms

上面提出的算法比原来的优化算法快20倍。它可以做得更快。实际上,如果 项目 的顺序无关紧要,您可以使用 argsort 来重新排序 BP_B。这使您不必在热循环中每次都计算 idx 和 select 直接计算 BP_B 中的最后一个元素(保证高于阈值但与原始代码的顺序不同)。因为 selected 项目连续存储在内存中,所以这个实现要快得多。最后,最后一个实现比原始优化算法快 60 倍。请注意,即使没有 Numba,提议的实现也比原始实现快得多。

这里是不关心项目顺序的实现:

@nb.njit('(int_[:,::1], int_[:,::1], float64[:,::1], float64[:,::1])')
def compute(A, B, P_A, P_B):
    assert P_A.shape[1] == 1
    assert P_B.shape[1] == 1

    nA, mA = A.shape
    nB, mB = B.shape
    m = mA + mB

    order = np.argsort(P_B.reshape(nB))
    P_B_sorted = P_B[order, :]
    B_sorted = B[order, :]
    counts = nB - np.searchsorted(P_B_sorted.reshape(nB), lim/P_A[:,0], side='right')
    nRes = np.sum(counts)

    A_B = np.empty((nRes, m), dtype=np.int_)
    P_A_B = np.empty((nRes, 1), dtype=np.float64)
    k = 0

    for i in range(P_A.shape[0]):
        if counts[i] > 0:
            start, end = k, k + counts[i]
            A_B[start:end, :mA] = A[i, :]
            A_B[start:end, mA:] = B_sorted[nB-counts[i]:, :]
            P_A_B[start:end, :] = P_B_sorted[nB-counts[i]:, :] * P_A[i, 0]
            k += counts[i]

    return A_B, P_A_B