计算字符串矩阵之间相似性内核的最快方法?
Fastest way to calculate the similarity kernel between matrices of strings?
假设我有一个名为 data
的包含 5 个矩阵的列表。每个矩阵都有任意数量的行,但只有 3 列包含 3 个字符串。我想训练一个高斯过程模型,假设 data
是我的训练集。我想根据每对矩阵的字符串匹配来计算相似度核。假设这 5 个矩阵如下所示:
import numpy as np
data = np.array([# Matrix 0
[['b', 'a', 'c'],
['a', 'a', 'b'],
['d', 'c', 'c'],
['a', 'b', 'd']],
# Matrix 1
[['d', 'a', 'c'],
['a', 'b', 'c'],
['a', 'd', 'd']],
# Matrix 2
[['a', 'b', 'b'],
['d', 'a', 'd'],
['d', 'b', 'a'],
['c', 'b', 'd']],
# Matrix 3
[['b', 'b', 'c'],
['a', 'b', 'b'],
['a', 'c', 'a'],
['c', 'b', 'a'],
['b', 'd', 'd']],
# Matrix 4
[['a', 'b', 'c'],
['c', 'a', 'b'],
['d', 'd', 'c'],
['a', 'a', 'a']]
], dtype=object)
我想计算每两个矩阵之间的相似度。我们以前两个矩阵为例。它们分别有 4 行和 3 行。我想检查所有 4 x 3
对的字符串匹配。在每一对中,如果它们相同,我们说每对字符串(只比较 0-0、1-1、2-2)之间的差异为 0
,否则为 1
。这将 returns 一个二元向量 diff
然后被送入平方指数(或 RBF)内核以获得 这对行 之间的相似性分数。然后我计算所有行对(0-0、0-1、0-2、1-0、...、3-0、3-1、3-2)之间的相似度分数并将它们加在一起,然后这个值是第一个和第二个矩阵之间的最终相似度。然后我对所有矩阵对都这样做,我可以获得最终的相似性内核 R
及其规范化版本 K
。下面是我的实现:
from itertools import product
import math
def kernel(data, sigma=1.):
# Initialize the similarity kernel
R = np.zeros(shape=(len(data), len(data)))
# Get every pair of matrices (including themselves)
for iprod in list(product(enumerate(data), enumerate(data))):
idxs, prod = zip(*[(i, c) for i, c in iprod])
ks = []
# Get every pair of rows between the two matrices
for pair in list(product(*prod)):
diff = (np.asarray(pair[0]) != np.asarray(pair[1])).astype(int)
# Squared exponential kernel
k = math.exp(-np.dot(diff, diff) / (2*sigma**2))
ks.append(k)
# Calculate sum and insert it into R[i,j] and R[j,i]
ktot = np.sum(ks)
R[idxs[0],idxs[1]] = ktot
R[idxs[1],idxs[0]] = ktot
# Normalize the similarity matrix R
d = np.diag(R)**-0.5
K = np.diag(d).dot(R).dot(np.diag(d))
return K
K = kernel(data)
print(K)
输出:
[[1. 0.81009275 0.71374617 0.7365101 0.81061262]
[0.81009275 1. 0.68228781 0.70349301 0.82009247]
[0.71374617 0.68228781 1. 0.78137859 0.68976163]
[0.7365101 0.70349301 0.78137859 1. 0.7365101 ]
[0.81061262 0.82009247 0.68976163 0.7365101 1. ]]
%timeit 2.86 ms ± 64.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
但是,当有更多训练数据或每个矩阵中有更多行时,我的代码变得非常慢。我的猜测是,这是因为我使用 itertools.product
太多,没有矩阵运算。我觉得应该有一个完全矢量化的 numpy
方法来做到这一点。我正在考虑 np.cov
和 np.kron
但不知道它如何适用于字符串。任何建议表示赞赏。
您可以使用 Numba 的 JIT 来加快计算的最热循环。但是,Numba 还不能很好地支持字符串。因此,策略是首先 将字符串转换为因子 (唯一整数标识字符串)。这种方法的好处还在于显着加快行比较。此外,还可以应用一些额外的优化:
- 用简单的嵌套循环
替换缓慢的product
调用
- 即时求和计算
- 手动计算范数(因为
!=
的结果是0|1布尔值)
- 利用对称性将操作数除以2
这是最终的实现:
import numba as nb
import numpy as np
from itertools import product
# Actual computational kernel
# Work on a list of 2D arrays containings 32-bit integers contiguously stored in memory
@nb.njit('float64[:,::1](ListType(int32[:,::1]), float64)')
def computePairs(data, sigma):
# Initialize the similarity kernel
matCount = len(data)
R = np.ones(shape=(matCount, matCount))
normFactor = -1.0 / (2*sigma**2)
# Get every pair of matrices (including themselves)
for i in range(matCount):
for j in range(i, matCount): # Note the matrix is symetric
ktot = 0.0
# Get every pair of rows between the two matrices
for a in data[i]:
for b in data[j]:
sqNorm = 0
for k in range(len(a)):
sqNorm += a[k] != b[k]
# Squared exponential kernel
ktot += np.exp(sqNorm * normFactor)
R[i,j] = ktot
R[j,i] = ktot
return R
# Transform the list of list of list of strings into a list of 2D factor arrays
def convertData(data):
labelToId = {}
labelCount = 0
result = []
for labelMat in data:
height = len(labelMat)
factorMat = np.empty((height, 3), dtype=np.int32)
for rowId in range(height):
row = labelMat[rowId]
for cellId in range(3):
cell = row[cellId]
if cell in labelToId:
labelId = labelToId[cell]
else:
labelId = labelCount
labelToId[cell] = labelCount
labelCount += 1
factorMat[rowId, cellId] = labelId
result.append(factorMat)
return nb.typed.List(result)
def kernel_fast(data, sigma=1.):
data = convertData(data)
R = computePairs(data, sigma)
# Normalize the similarity matrix R
d = np.diag(R)**-0.5
K = np.diag(d).dot(R).dot(np.diag(d))
return K
在我的机器上提供的小型输入示例中,这大约快 45 倍。 大约 70% 的时间用于数据转换。这意味着如果输入直接适合所提供的算法,计算速度可以快大约 3 倍。
如果您打算处理更大或更多的矩阵,请注意您可以并行化上述算法以使其更快(对于小矩阵不值得输入)。
如果您替换内部循环矢量化调用,对于示例中提供的列表,您可以获得大约一个数量级的速度。我不知道如何向量化外循环,因为 data
由不同形状的数组组成......无论如何,在下面的代码片段中,我们使用广播以成对的方式在数组之间进行操作,以及 einsum
额外的凉爽 ;)。改进没有从@Jérôme Richard 那里获得的那么多,但它只使用 NumPy!
def kernel_oneLoop(data, sigma = 1.):
# Convert matrices to arrays first
data = np.array([np.asarray(d, dtype = object) for d in data])
# Initialize the similarity kernel
R = np.zeros(shape=(data.size, data.size))
# Iterate through upper triangular matrix indices
idx0, idx1 = np.triu_indices_from(R)
for i in range(idx0.size):
diff = (data[idx0[i]] != data[idx1[i]][:,None]).astype(int)
# Squared exponential kernel (no need to square as they're 0's and 1's)
k = np.exp(-diff.sum(axis=-1) / 2*sigma**2)
# Calculate sum and insert it into R[i,j] and R[j,i]
R[idx0[i],idx1[i]] = k.sum()
# Normalize the similarity matrix R
d = np.diag(R)**-0.5
K = np.einsum("i,ij,j->ij", d, R, d)
# Symmetrize
K = K + K.T - np.diag(np.diag(K))
return K
Edit:正如 Jérôme 所指出的,由于输出是一个对称数组,我们可以只遍历数组的上三角索引。我相应地更新了代码。
假设我有一个名为 data
的包含 5 个矩阵的列表。每个矩阵都有任意数量的行,但只有 3 列包含 3 个字符串。我想训练一个高斯过程模型,假设 data
是我的训练集。我想根据每对矩阵的字符串匹配来计算相似度核。假设这 5 个矩阵如下所示:
import numpy as np
data = np.array([# Matrix 0
[['b', 'a', 'c'],
['a', 'a', 'b'],
['d', 'c', 'c'],
['a', 'b', 'd']],
# Matrix 1
[['d', 'a', 'c'],
['a', 'b', 'c'],
['a', 'd', 'd']],
# Matrix 2
[['a', 'b', 'b'],
['d', 'a', 'd'],
['d', 'b', 'a'],
['c', 'b', 'd']],
# Matrix 3
[['b', 'b', 'c'],
['a', 'b', 'b'],
['a', 'c', 'a'],
['c', 'b', 'a'],
['b', 'd', 'd']],
# Matrix 4
[['a', 'b', 'c'],
['c', 'a', 'b'],
['d', 'd', 'c'],
['a', 'a', 'a']]
], dtype=object)
我想计算每两个矩阵之间的相似度。我们以前两个矩阵为例。它们分别有 4 行和 3 行。我想检查所有 4 x 3
对的字符串匹配。在每一对中,如果它们相同,我们说每对字符串(只比较 0-0、1-1、2-2)之间的差异为 0
,否则为 1
。这将 returns 一个二元向量 diff
然后被送入平方指数(或 RBF)内核以获得 这对行 之间的相似性分数。然后我计算所有行对(0-0、0-1、0-2、1-0、...、3-0、3-1、3-2)之间的相似度分数并将它们加在一起,然后这个值是第一个和第二个矩阵之间的最终相似度。然后我对所有矩阵对都这样做,我可以获得最终的相似性内核 R
及其规范化版本 K
。下面是我的实现:
from itertools import product
import math
def kernel(data, sigma=1.):
# Initialize the similarity kernel
R = np.zeros(shape=(len(data), len(data)))
# Get every pair of matrices (including themselves)
for iprod in list(product(enumerate(data), enumerate(data))):
idxs, prod = zip(*[(i, c) for i, c in iprod])
ks = []
# Get every pair of rows between the two matrices
for pair in list(product(*prod)):
diff = (np.asarray(pair[0]) != np.asarray(pair[1])).astype(int)
# Squared exponential kernel
k = math.exp(-np.dot(diff, diff) / (2*sigma**2))
ks.append(k)
# Calculate sum and insert it into R[i,j] and R[j,i]
ktot = np.sum(ks)
R[idxs[0],idxs[1]] = ktot
R[idxs[1],idxs[0]] = ktot
# Normalize the similarity matrix R
d = np.diag(R)**-0.5
K = np.diag(d).dot(R).dot(np.diag(d))
return K
K = kernel(data)
print(K)
输出:
[[1. 0.81009275 0.71374617 0.7365101 0.81061262]
[0.81009275 1. 0.68228781 0.70349301 0.82009247]
[0.71374617 0.68228781 1. 0.78137859 0.68976163]
[0.7365101 0.70349301 0.78137859 1. 0.7365101 ]
[0.81061262 0.82009247 0.68976163 0.7365101 1. ]]
%timeit 2.86 ms ± 64.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
但是,当有更多训练数据或每个矩阵中有更多行时,我的代码变得非常慢。我的猜测是,这是因为我使用 itertools.product
太多,没有矩阵运算。我觉得应该有一个完全矢量化的 numpy
方法来做到这一点。我正在考虑 np.cov
和 np.kron
但不知道它如何适用于字符串。任何建议表示赞赏。
您可以使用 Numba 的 JIT 来加快计算的最热循环。但是,Numba 还不能很好地支持字符串。因此,策略是首先 将字符串转换为因子 (唯一整数标识字符串)。这种方法的好处还在于显着加快行比较。此外,还可以应用一些额外的优化:
- 用简单的嵌套循环 替换缓慢的
- 即时求和计算
- 手动计算范数(因为
!=
的结果是0|1布尔值) - 利用对称性将操作数除以2
product
调用
这是最终的实现:
import numba as nb
import numpy as np
from itertools import product
# Actual computational kernel
# Work on a list of 2D arrays containings 32-bit integers contiguously stored in memory
@nb.njit('float64[:,::1](ListType(int32[:,::1]), float64)')
def computePairs(data, sigma):
# Initialize the similarity kernel
matCount = len(data)
R = np.ones(shape=(matCount, matCount))
normFactor = -1.0 / (2*sigma**2)
# Get every pair of matrices (including themselves)
for i in range(matCount):
for j in range(i, matCount): # Note the matrix is symetric
ktot = 0.0
# Get every pair of rows between the two matrices
for a in data[i]:
for b in data[j]:
sqNorm = 0
for k in range(len(a)):
sqNorm += a[k] != b[k]
# Squared exponential kernel
ktot += np.exp(sqNorm * normFactor)
R[i,j] = ktot
R[j,i] = ktot
return R
# Transform the list of list of list of strings into a list of 2D factor arrays
def convertData(data):
labelToId = {}
labelCount = 0
result = []
for labelMat in data:
height = len(labelMat)
factorMat = np.empty((height, 3), dtype=np.int32)
for rowId in range(height):
row = labelMat[rowId]
for cellId in range(3):
cell = row[cellId]
if cell in labelToId:
labelId = labelToId[cell]
else:
labelId = labelCount
labelToId[cell] = labelCount
labelCount += 1
factorMat[rowId, cellId] = labelId
result.append(factorMat)
return nb.typed.List(result)
def kernel_fast(data, sigma=1.):
data = convertData(data)
R = computePairs(data, sigma)
# Normalize the similarity matrix R
d = np.diag(R)**-0.5
K = np.diag(d).dot(R).dot(np.diag(d))
return K
在我的机器上提供的小型输入示例中,这大约快 45 倍。 大约 70% 的时间用于数据转换。这意味着如果输入直接适合所提供的算法,计算速度可以快大约 3 倍。
如果您打算处理更大或更多的矩阵,请注意您可以并行化上述算法以使其更快(对于小矩阵不值得输入)。
如果您替换内部循环矢量化调用,对于示例中提供的列表,您可以获得大约一个数量级的速度。我不知道如何向量化外循环,因为 data
由不同形状的数组组成......无论如何,在下面的代码片段中,我们使用广播以成对的方式在数组之间进行操作,以及 einsum
额外的凉爽 ;)。改进没有从@Jérôme Richard 那里获得的那么多,但它只使用 NumPy!
def kernel_oneLoop(data, sigma = 1.):
# Convert matrices to arrays first
data = np.array([np.asarray(d, dtype = object) for d in data])
# Initialize the similarity kernel
R = np.zeros(shape=(data.size, data.size))
# Iterate through upper triangular matrix indices
idx0, idx1 = np.triu_indices_from(R)
for i in range(idx0.size):
diff = (data[idx0[i]] != data[idx1[i]][:,None]).astype(int)
# Squared exponential kernel (no need to square as they're 0's and 1's)
k = np.exp(-diff.sum(axis=-1) / 2*sigma**2)
# Calculate sum and insert it into R[i,j] and R[j,i]
R[idx0[i],idx1[i]] = k.sum()
# Normalize the similarity matrix R
d = np.diag(R)**-0.5
K = np.einsum("i,ij,j->ij", d, R, d)
# Symmetrize
K = K + K.T - np.diag(np.diag(K))
return K
Edit:正如 Jérôme 所指出的,由于输出是一个对称数组,我们可以只遍历数组的上三角索引。我相应地更新了代码。