有没有办法矢量化这种动态时间扭曲算法?
Is there a way to vectorise this dynamic time warping algorithm?
动态时间规整算法提供了两个时间序列之间距离的概念,这两个时间序列的速度可能会有所不同。如果我有 N 个序列要相互比较,我可以通过成对应用该算法来构造一个具有 nule 对角线的 NXN 对称矩阵。然而,这对于长二维序列来说非常慢。因此,我试图对代码进行矢量化以加速此矩阵计算。重要的是,我还想提取定义最佳对齐方式的索引。
到目前为止我的成对比较代码:
import math
import numpy as np
seq1 = np.random.randint(100, size=(100, 2)) #Two dim sequences
seq2 = np.random.randint(100, size=(100, 2))
def seqdist(seq1, seq2): # dynamic time warping function
ns = len(seq1)
nt = len(seq2)
D = np.zeros((ns+1, nt+1))+math.inf
D[0, 0] = 0
cost = np.zeros((ns,nt))
for i in range(ns):
for j in range(nt):
cost[i,j] = np.linalg.norm(seq1[i,:]-seq2[j,:])
D[i+1, j+1] = cost[i,j]+min([D[i, j+1], D[i+1, j], D[i, j]])
d = D[ns,nt] # distance
matchidx = [[ns-1, nt-1]] # backwards optimal alignment computation
i = ns
j = nt
for k in range(ns+nt+2):
idx = np.argmin([D[i-1, j], D[i, j-1], D[i-1, j-1]])
if idx == 0 and i > 1 and j > 0:
matchidx.append([i-2, j-1])
i -= 1
elif idx == 1 and i > 0 and j > 1:
matchidx.append([i-1, j-2])
j -= 1
elif idx == 2 and i > 1 and j > 1:
matchidx.append([i-2, j-2])
i -= 1
j -= 1
else:
break
matchidx.reverse()
return d, matchidx
[d,matchidx] = seqdist(seq1,seq2) #try it
这是您的代码中的一个 re-write,它更适合 numba.jit
。这不完全是一个矢量化解决方案,但我看到这个基准测试的速度提高了 230 倍。
from numba import jit
from scipy import spatial
@jit
def D_from_cost(cost, D):
# operates on D inplace
ns, nt = cost.shape
for i in range(ns):
for j in range(nt):
D[i+1, j+1] = cost[i,j]+min(D[i, j+1], D[i+1, j], D[i, j])
# avoiding the list creation inside mean enables better jit performance
# D[i+1, j+1] = cost[i,j]+min([D[i, j+1], D[i+1, j], D[i, j]])
@jit
def get_d(D, matchidx):
ns = D.shape[0] - 1
nt = D.shape[1] - 1
d = D[ns,nt]
matchidx[0,0] = ns - 1
matchidx[0,1] = nt - 1
i = ns
j = nt
for k in range(1, ns+nt+3):
idx = 0
if not (D[i-1,j] <= D[i,j-1] and D[i-1,j] <= D[i-1,j-1]):
if D[i,j-1] <= D[i-1,j-1]:
idx = 1
else:
idx = 2
if idx == 0 and i > 1 and j > 0:
# matchidx.append([i-2, j-1])
matchidx[k,0] = i - 2
matchidx[k,1] = j - 1
i -= 1
elif idx == 1 and i > 0 and j > 1:
# matchidx.append([i-1, j-2])
matchidx[k,0] = i-1
matchidx[k,1] = j-2
j -= 1
elif idx == 2 and i > 1 and j > 1:
# matchidx.append([i-2, j-2])
matchidx[k,0] = i-2
matchidx[k,1] = j-2
i -= 1
j -= 1
else:
break
return d, matchidx[:k]
def seqdist2(seq1, seq2):
ns = len(seq1)
nt = len(seq2)
cost = spatial.distance_matrix(seq1, seq2)
# initialize and update D
D = np.full((ns+1, nt+1), np.inf)
D[0, 0] = 0
D_from_cost(cost, D)
matchidx = np.zeros((ns+nt+2,2), dtype=np.int)
d, matchidx = get_d(D, matchidx)
return d, matchidx[::-1].tolist()
assert seqdist2(seq1, seq2) == seqdist(seq1, seq2)
%timeit seqdist2(seq1, seq2) # 1000 loops, best of 3: 365 µs per loop
%timeit seqdist(seq1, seq2) # 10 loops, best of 3: 86.1 ms per loop
这里有一些变化:
cost
是用spatial.distance_matrix
. 计算出来的
idx
的定义被一堆丑陋的 if 语句所取代,这使得编译代码更快。
min([D[i, j+1], D[i+1, j], D[i, j]])
被替换为 min(D[i, j+1], D[i+1, j], D[i, j])
,即我们不取列表中的最小值,而是取三个值中的最小值。这导致 jit
. 下的惊人加速
matchidx
被预先分配为一个 numpy 数组,并在输出之前被截断为正确的大小。
动态时间规整算法提供了两个时间序列之间距离的概念,这两个时间序列的速度可能会有所不同。如果我有 N 个序列要相互比较,我可以通过成对应用该算法来构造一个具有 nule 对角线的 NXN 对称矩阵。然而,这对于长二维序列来说非常慢。因此,我试图对代码进行矢量化以加速此矩阵计算。重要的是,我还想提取定义最佳对齐方式的索引。
到目前为止我的成对比较代码:
import math
import numpy as np
seq1 = np.random.randint(100, size=(100, 2)) #Two dim sequences
seq2 = np.random.randint(100, size=(100, 2))
def seqdist(seq1, seq2): # dynamic time warping function
ns = len(seq1)
nt = len(seq2)
D = np.zeros((ns+1, nt+1))+math.inf
D[0, 0] = 0
cost = np.zeros((ns,nt))
for i in range(ns):
for j in range(nt):
cost[i,j] = np.linalg.norm(seq1[i,:]-seq2[j,:])
D[i+1, j+1] = cost[i,j]+min([D[i, j+1], D[i+1, j], D[i, j]])
d = D[ns,nt] # distance
matchidx = [[ns-1, nt-1]] # backwards optimal alignment computation
i = ns
j = nt
for k in range(ns+nt+2):
idx = np.argmin([D[i-1, j], D[i, j-1], D[i-1, j-1]])
if idx == 0 and i > 1 and j > 0:
matchidx.append([i-2, j-1])
i -= 1
elif idx == 1 and i > 0 and j > 1:
matchidx.append([i-1, j-2])
j -= 1
elif idx == 2 and i > 1 and j > 1:
matchidx.append([i-2, j-2])
i -= 1
j -= 1
else:
break
matchidx.reverse()
return d, matchidx
[d,matchidx] = seqdist(seq1,seq2) #try it
这是您的代码中的一个 re-write,它更适合 numba.jit
。这不完全是一个矢量化解决方案,但我看到这个基准测试的速度提高了 230 倍。
from numba import jit
from scipy import spatial
@jit
def D_from_cost(cost, D):
# operates on D inplace
ns, nt = cost.shape
for i in range(ns):
for j in range(nt):
D[i+1, j+1] = cost[i,j]+min(D[i, j+1], D[i+1, j], D[i, j])
# avoiding the list creation inside mean enables better jit performance
# D[i+1, j+1] = cost[i,j]+min([D[i, j+1], D[i+1, j], D[i, j]])
@jit
def get_d(D, matchidx):
ns = D.shape[0] - 1
nt = D.shape[1] - 1
d = D[ns,nt]
matchidx[0,0] = ns - 1
matchidx[0,1] = nt - 1
i = ns
j = nt
for k in range(1, ns+nt+3):
idx = 0
if not (D[i-1,j] <= D[i,j-1] and D[i-1,j] <= D[i-1,j-1]):
if D[i,j-1] <= D[i-1,j-1]:
idx = 1
else:
idx = 2
if idx == 0 and i > 1 and j > 0:
# matchidx.append([i-2, j-1])
matchidx[k,0] = i - 2
matchidx[k,1] = j - 1
i -= 1
elif idx == 1 and i > 0 and j > 1:
# matchidx.append([i-1, j-2])
matchidx[k,0] = i-1
matchidx[k,1] = j-2
j -= 1
elif idx == 2 and i > 1 and j > 1:
# matchidx.append([i-2, j-2])
matchidx[k,0] = i-2
matchidx[k,1] = j-2
i -= 1
j -= 1
else:
break
return d, matchidx[:k]
def seqdist2(seq1, seq2):
ns = len(seq1)
nt = len(seq2)
cost = spatial.distance_matrix(seq1, seq2)
# initialize and update D
D = np.full((ns+1, nt+1), np.inf)
D[0, 0] = 0
D_from_cost(cost, D)
matchidx = np.zeros((ns+nt+2,2), dtype=np.int)
d, matchidx = get_d(D, matchidx)
return d, matchidx[::-1].tolist()
assert seqdist2(seq1, seq2) == seqdist(seq1, seq2)
%timeit seqdist2(seq1, seq2) # 1000 loops, best of 3: 365 µs per loop
%timeit seqdist(seq1, seq2) # 10 loops, best of 3: 86.1 ms per loop
这里有一些变化:
cost
是用spatial.distance_matrix
. 计算出来的
idx
的定义被一堆丑陋的 if 语句所取代,这使得编译代码更快。min([D[i, j+1], D[i+1, j], D[i, j]])
被替换为min(D[i, j+1], D[i+1, j], D[i, j])
,即我们不取列表中的最小值,而是取三个值中的最小值。这导致jit
. 下的惊人加速
matchidx
被预先分配为一个 numpy 数组,并在输出之前被截断为正确的大小。