计算字符串矩阵之间相似性内核的最快方法?

Fastest way to calculate the similarity kernel between matrices of strings?

假设我有一个名为 data 的包含 5 个矩阵的列表。每个矩阵都有任意数量的行,但只有 3 列包含 3 个字符串。我想训练一个高斯过程模型,假设 data 是我的训练集。我想根据每对矩阵的字符串匹配来计算相似度核。假设这 5 个矩阵如下所示:

import numpy as np

data = np.array([# Matrix 0
                 [['b', 'a', 'c'], 
                  ['a', 'a', 'b'], 
                  ['d', 'c', 'c'], 
                  ['a', 'b', 'd']],
             # Matrix 1
             [['d', 'a', 'c'], 
              ['a', 'b', 'c'], 
              ['a', 'd', 'd']],
         # Matrix 2
         [['a', 'b', 'b'], 
          ['d', 'a', 'd'], 
          ['d', 'b', 'a'], 
          ['c', 'b', 'd']],
     # Matrix 3
     [['b', 'b', 'c'], 
      ['a', 'b', 'b'], 
      ['a', 'c', 'a'], 
      ['c', 'b', 'a'], 
      ['b', 'd', 'd']],
 # Matrix 4
 [['a', 'b', 'c'], 
  ['c', 'a', 'b'], 
  ['d', 'd', 'c'], 
  ['a', 'a', 'a']]
], dtype=object)

我想计算每两个矩阵之间的相似度。我们以前两个矩阵为例。它们分别有 4 行和 3 行。我想检查所有 4 x 3 对的字符串匹配。在每一对中,如果它们相同,我们说每对字符串(只比较 0-0、1-1、2-2)之间的差异为 0,否则为 1。这将 returns 一个二元向量 diff 然后被送入平方指数(或 RBF)内核以获得 这对行 之间的相似性分数。然后我计算所有行对(0-0、0-1、0-2、1-0、...、3-0、3-1、3-2)之间的相似度分数并将它们加在一起,然后这个值是第一个和第二个矩阵之间的最终相似度。然后我对所有矩阵对都这样做,我可以获得最终的相似性内核 R 及其规范化版本 K。下面是我的实现:

from itertools import product
import math

def kernel(data, sigma=1.):
    # Initialize the similarity kernel
    R = np.zeros(shape=(len(data), len(data)))

    # Get every pair of matrices (including themselves)
    for iprod in list(product(enumerate(data), enumerate(data))):
        idxs, prod = zip(*[(i, c) for i, c in iprod])
        ks = []

        # Get every pair of rows between the two matrices
        for pair in list(product(*prod)):
            diff = (np.asarray(pair[0]) != np.asarray(pair[1])).astype(int)

            # Squared exponential kernel
            k = math.exp(-np.dot(diff, diff) / (2*sigma**2))
            ks.append(k)

        # Calculate sum and insert it into R[i,j] and R[j,i]
        ktot = np.sum(ks)
        R[idxs[0],idxs[1]] = ktot
        R[idxs[1],idxs[0]] = ktot

    # Normalize the similarity matrix R
    d = np.diag(R)**-0.5
    K = np.diag(d).dot(R).dot(np.diag(d))

    return K

K = kernel(data)
print(K)

输出:

[[1.         0.81009275 0.71374617 0.7365101  0.81061262]
 [0.81009275 1.         0.68228781 0.70349301 0.82009247]
 [0.71374617 0.68228781 1.         0.78137859 0.68976163]
 [0.7365101  0.70349301 0.78137859 1.         0.7365101 ]
 [0.81061262 0.82009247 0.68976163 0.7365101  1.        ]]

%timeit 2.86 ms ± 64.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

但是,当有更多训练数据或每个矩阵中有更多行时,我的代码变得非常慢。我的猜测是,这是因为我使用 itertools.product 太多,没有矩阵运算。我觉得应该有一个完全矢量化的 numpy 方法来做到这一点。我正在考虑 np.covnp.kron 但不知道它如何适用于字符串。任何建议表示赞赏。

您可以使用 Numba 的 JIT 来加快计算的最热循环。但是,Numba 还不能很好地支持字符串。因此,策略是首先 将字符串转换为因子 (唯一整数标识字符串)。这种方法的好处还在于显着加快行比较。此外,还可以应用一些额外的优化:

  • 简单的嵌套循环
  • 替换缓慢的product调用
  • 即时求和计算
  • 手动计算范数(因为!=的结果是0|1布尔值)
  • 利用对称性将操作数除以2

这是最终的实现:

import numba as nb
import numpy as np
from itertools import product


# Actual computational kernel
# Work on a list of 2D arrays containings 32-bit integers contiguously stored in memory
@nb.njit('float64[:,::1](ListType(int32[:,::1]), float64)')
def computePairs(data, sigma):
    # Initialize the similarity kernel
    matCount = len(data)
    R = np.ones(shape=(matCount, matCount))
    normFactor = -1.0 / (2*sigma**2)

    # Get every pair of matrices (including themselves)
    for i in range(matCount):
        for j in range(i, matCount): # Note the matrix is symetric
            ktot = 0.0

            # Get every pair of rows between the two matrices
            for a in data[i]:
                for b in data[j]:
                    sqNorm = 0

                    for k in range(len(a)):
                        sqNorm += a[k] != b[k]

                    # Squared exponential kernel
                    ktot += np.exp(sqNorm * normFactor)

            R[i,j] = ktot
            R[j,i] = ktot

    return R

# Transform the list of list of list of strings into a list of 2D factor arrays
def convertData(data):
    labelToId = {}
    labelCount = 0
    result = []
    
    for labelMat in data:
        height = len(labelMat)
        factorMat = np.empty((height, 3), dtype=np.int32)

        for rowId in range(height):
            row = labelMat[rowId]
            for cellId in range(3):
                cell = row[cellId]

                if cell in labelToId:
                    labelId = labelToId[cell]
                else:
                    labelId = labelCount
                    labelToId[cell] = labelCount
                    labelCount += 1

                factorMat[rowId, cellId] = labelId

        result.append(factorMat)

    return nb.typed.List(result)

def kernel_fast(data, sigma=1.):
    data = convertData(data)

    R = computePairs(data, sigma)

    # Normalize the similarity matrix R
    d = np.diag(R)**-0.5
    K = np.diag(d).dot(R).dot(np.diag(d))

    return K

在我的机器上提供的小型输入示例中,这大约快 45 倍大约 70% 的时间用于数据转换。这意味着如果输入直接适合所提供的算法,计算速度可以快大约 3 倍。

如果您打算处理更大或更多的矩阵,请注意您可以并行化上述算法以使其更快(对于小矩阵不值得输入)。

如果您替换内部循环矢量化调用,对于示例中提供的列表,您可以获得大约一个数量级的速度。我不知道如何向量化外循环,因为 data 由不同形状的数组组成......无论如何,在下面的代码片段中,我们使用广播以成对的方式在数组之间进行操作,以及 einsum 额外的凉爽 ;)。改进没有从@Jérôme Richard 那里获得的那么多,但它只使用 NumPy!

def kernel_oneLoop(data, sigma = 1.):
    # Convert matrices to arrays first
    data = np.array([np.asarray(d, dtype = object) for d in data])
    # Initialize the similarity kernel
    R = np.zeros(shape=(data.size, data.size))
    # Iterate through upper triangular matrix indices
    idx0, idx1 = np.triu_indices_from(R)
    for i in range(idx0.size):
        diff = (data[idx0[i]] != data[idx1[i]][:,None]).astype(int)
        # Squared exponential kernel (no need to square as they're 0's and 1's)
        k = np.exp(-diff.sum(axis=-1) / 2*sigma**2)
        # Calculate sum and insert it into R[i,j] and R[j,i]
        R[idx0[i],idx1[i]] = k.sum()
    # Normalize the similarity matrix R
    d = np.diag(R)**-0.5
    K = np.einsum("i,ij,j->ij", d, R, d)
    # Symmetrize
    K = K + K.T - np.diag(np.diag(K))
    
    return K

Edit:正如 Jérôme 所指出的,由于输出是一个对称数组,我们可以只遍历数组的上三角索引。我相应地更新了代码。