如何通过向量化在 python 中加速此 DP 函数
How to speed up this DP function in python with vectorization
所以我这里有这个定义,
DP[i,j] = f[i,j] + min(DP[i−1, j −1], DP[i−1, j], DP[i−1, j +1])
它定义了从 NxM 矩阵顶部到矩阵底部的最小应计成本。 f 中的每个单元格代表一个 value/cost(1.2、0、10 等)从另一个单元格前往该单元格。
矩阵可能很大(1500x1500,它是 Gradient map of an image),我编写的 DP 算法对我的矩阵来说大约是每 运行 秒。该矩阵每次执行需要 运行 数百次,因此总程序 运行 时间长达几分钟。这个循环大约是我瓶颈的 99%,所以我试图用 Python/numpys 向量化方法优化这个循环。我只能访问 Numpy,并且 Scipy.
注意:我几乎不在 python 中编程,所以解决方案可能很明显。
第一次尝试,只是简单的循环,这里的时间大约是每个 运行
2-2.5 秒
DP = f.copy()
for r in range(2, len(DP) - 1): # Start at row 2 since row one doesn't change
for c in range(1, len(DP[0]) - 1):
DP[r][c] += min(DP[r - 1, c-1:c+2])
第二次尝试,我尝试利用一些 numpy 向量化函数“fromiter”来一次计算整行而不是逐列计算,这里的时间大约是每 运行 1-1.5 秒。我的目标是使它至少快一个数量级,但我不知道如何优化它。
DP = f.copy()
for r in range(2, len(DP) - 1):
def foo(arr):
idx, val = arr
if idx == 0 or idx == len(DP[[0]) - 1:
return np.inf
return val + min(DP[r - 1, idx - 1], DP[r - 1, idx], DP[r - 1, idx + 1])
DP[r, :] = np.fromiter(map(foo, enumerate(DP[r, :])))
正如 hpaulj 所说,由于您的问题本质上是连续的,因此很难完全矢量化,尽管这似乎是可能的(每个单元格都根据行 r=2
的值更新,不同之处在于考虑的数量以下每一行的第 2 行的三元组)所以也许你可以找到一个聪明的方法来做到这一点!
也就是说,一个快速的半向量化解决方案是使用 user42541 提出的执行 sliding windows with fancy indexing 的巧妙方法,因此我们用向量化调用替换内部循环:
indexer = np.arange(3)[:,None] + np.arange(DP.shape[1] - 2)[None,:]
for r in range(2, DP.shape[0] - 1):
DP[r,1:-1] += np.min(DP[r-1,indexer], axis = 0)
相对于您的双循环方法(您的矢量化解决方案在我的电脑上不起作用),对于 1500x1500 整数数组,这会导致大约两个数量级的加速。
所以我这里有这个定义,
DP[i,j] = f[i,j] + min(DP[i−1, j −1], DP[i−1, j], DP[i−1, j +1])
它定义了从 NxM 矩阵顶部到矩阵底部的最小应计成本。 f 中的每个单元格代表一个 value/cost(1.2、0、10 等)从另一个单元格前往该单元格。
矩阵可能很大(1500x1500,它是 Gradient map of an image),我编写的 DP 算法对我的矩阵来说大约是每 运行 秒。该矩阵每次执行需要 运行 数百次,因此总程序 运行 时间长达几分钟。这个循环大约是我瓶颈的 99%,所以我试图用 Python/numpys 向量化方法优化这个循环。我只能访问 Numpy,并且 Scipy.
注意:我几乎不在 python 中编程,所以解决方案可能很明显。
第一次尝试,只是简单的循环,这里的时间大约是每个 运行
2-2.5 秒DP = f.copy()
for r in range(2, len(DP) - 1): # Start at row 2 since row one doesn't change
for c in range(1, len(DP[0]) - 1):
DP[r][c] += min(DP[r - 1, c-1:c+2])
第二次尝试,我尝试利用一些 numpy 向量化函数“fromiter”来一次计算整行而不是逐列计算,这里的时间大约是每 运行 1-1.5 秒。我的目标是使它至少快一个数量级,但我不知道如何优化它。
DP = f.copy()
for r in range(2, len(DP) - 1):
def foo(arr):
idx, val = arr
if idx == 0 or idx == len(DP[[0]) - 1:
return np.inf
return val + min(DP[r - 1, idx - 1], DP[r - 1, idx], DP[r - 1, idx + 1])
DP[r, :] = np.fromiter(map(foo, enumerate(DP[r, :])))
正如 hpaulj 所说,由于您的问题本质上是连续的,因此很难完全矢量化,尽管这似乎是可能的(每个单元格都根据行 r=2
的值更新,不同之处在于考虑的数量以下每一行的第 2 行的三元组)所以也许你可以找到一个聪明的方法来做到这一点!
也就是说,一个快速的半向量化解决方案是使用 user42541 提出的执行 sliding windows with fancy indexing 的巧妙方法,因此我们用向量化调用替换内部循环:
indexer = np.arange(3)[:,None] + np.arange(DP.shape[1] - 2)[None,:]
for r in range(2, DP.shape[0] - 1):
DP[r,1:-1] += np.min(DP[r-1,indexer], axis = 0)
相对于您的双循环方法(您的矢量化解决方案在我的电脑上不起作用),对于 1500x1500 整数数组,这会导致大约两个数量级的加速。