在不复制的情况下将稀疏矩阵乘以数组
Multiplying a sparse matrix by an array without copying
我有一个一维数组(大小为 M
的向量),一个相当大的数组,我绝对不想在内存中复制它。我还有一个windowN
的稀疏矩阵(任意大小,基本上除了对角线&N
伪对角线之外的所有元素都是零)。
我想用向量乘以这个稀疏矩阵,而不必在内存中复制向量。最简单、最有效的方法是什么?必须有一个巧妙的解决方案,但我不知道合适的文献,而且我没有受过足够的教育来解决这个问题。
N=1
有一个解决方案(其中矩阵是:a
在对角线上,b
在两个最接近的伪对角线上)。解决方案看起来像这样(例如,在 python 中):
tmp2 = array[0]
i = 1
while (i < len(array) - 1):
tmp1 = b * (array[i - 1] + array[i + 1]) + a * array[i]
array[i - 1] = tmp2
i += 1
tmp2 = b * (array[i - 1] + array[i + 1]) + a * array[i]
array[i - 1] = tmp1
i += 1
但我无法将其概括为任意 N
。
注释:我绝对不想在内存中复制大小 M
向量。但是,使用大小为 2N+1
的临时数组是可以的,因为 M >> N
。我正在寻找一个实际的算法描述,而不是一个智能的自定义库来完成这项工作。
提前致谢!
考虑矩阵
[
1 2 3 0 0 0
2 1 2 4 0 0
3 2 1 2 5 0
0 7 2 1 2 6
0 0 8 2 1 2
0 0 0 9 2 1
]
和向量 v [1,2,3,4,5,6]
对于每一行,在v
的涉及系数下方:
[1,2,3]
[1,2,3,4]
[1,2,3,4,5]
[ 2,3,4,5,6]
[ 3,4,5,6]
[ 4,5,6]
如您所见,您只需要跟踪 v
个 window。
那个window原来是[1,2,3,4,5]
(for i = 0,1,2)
然后每隔 i
将 window 向右移动一次(并最终将其截断以使最后一行不超出 v
的范围...)
现在请注意,当您向右移动时,您只需要知道 v
的下一个值,只要您没有弄脏该值(通过写入 v
)您的新 window 有效。
第i
行,window为[i-n;i+n]
,将修改的系数为v[i]
。对于下一个window,你需要知道v[i+n+1]
哪个还没有被弄脏。一切都很好。
所以算法就像
window = circularbuffer(2n+1) //you push to the right, and if length > 2n+1, you remove first elem
for i = 0; i<v.size()
v[i] = prod(row_i, window) // only for the row_i coeffs...
if i >= n && < M-3
window.push(v[i+n+1])
else if i>= M-3
window.shift() // just remove the first value
const N = 2
const M_SIZE = 10
function toString(M){
return M.map(x=>x.join(' ')).join('\n')
}
const { M, v } = (_ => {
const M = Array(M_SIZE).fill(0).map(x=>Array(M_SIZE).fill(0))
let z = 1
for(let i = 0; i<M_SIZE; ++i){
for(let j = -N; j<=N; ++j){
if(i+j >= 0 && i+j <M_SIZE){
M[i][i+j] = (z++ % (N*2))+1
}
}
}
const v = Array(M.length).fill(0).map((x,i)=>i)
return { M, v}
})()
function classic(M, v){
return M.map(r => r.reduce((acc, x, j) => acc + v[j]*x, 0))
}
function inplace(M, v){
// captn inefficiency
const circBuf = (init => {
let buf = init
return {
push (x) {
buf.push(x)
buf.shift()
},
shift() {
buf.shift()
},
at (i) { return buf[i] },
toString() {
return buf.join(' ')
}
}
})(v.slice(0, 2 * N + 1))
const sparseProd = (row, buf) => {
let s = 0
row.forEach((x, j) => s += x * buf.at(j))
return s
}
const sparseRows = M.map(r => r.filter(x => x !== 0))
sparseRows.forEach((row, i) => {
v[i] = sparseProd(row, circBuf)
if (i >= sparseRows.length - 3 ) {
circBuf.shift()
} else {
if (i >= N) {
circBuf.push(v[i + N + 1])
}
}
})
}
console.log('classic prod', classic(M, v))
inplace(M, v)
console.log('inplace prod', v)
所以我最终做了这样的事情。这似乎是对 N=1 情况所做的概括。
一般来说,我的权重基本上是稀疏矩阵中中心行的非零分量。 IE。如果矩阵看起来像这样(如评论中所述,它通常是对称的,但不一定):
| a b c 0 0 ... 0 0 0 0 0 |
| b a b c 0 ... 0 0 0 0 0 |
| c b a b c ... 0 0 0 0 0 |
| 0 c b a b ... 0 0 0 0 0 |
| 0 0 c b a ... 0 0 0 0 0 |
| ... ... ... |
| 0 0 0 0 0 ... a b c 0 0 |
| 0 0 0 0 0 ... b a b c 0 |
| 0 0 0 0 0 ... c b a b c |
| 0 0 0 0 0 ... 0 c b a b |
| 0 0 0 0 0 ... 0 0 c b a |
那么权重向量就是[c, b, a, b, c]
(即N = 2
)。
所以对于 N = ntimes
我结束做这样的事情的一般情况:
def sparse_multiply(array, weights):
ntimes = (len(weights) - 1) / 2
# reduced dot product
def product(a_, i_, w_):
dot = 0.0
for k, j in enumerate(range(i_ - ntimes, i_ + ntimes + 1)):
if (j >= 0 and j < len(a_)):
dot += a_[j] * w_[k]
return dot
tmp = np.zeros(ntimes + 1)
for i in range(ntimes):
tmp[i] = array[i]
i = ntimes
while (i <= len(array)):
for t in range(-1, ntimes):
tmp[t] = product(array, i, w)
array[i - ntimes] = tmp[t + 1]
i += 1
return array
您所做的唯一牺牲是大小为 O(N
) 的临时数组,这很好,因为正如我所说,N << M
.
是的,是的,我知道一些操作(比如减少点积)可以用一些 python 魔法来完成。但我的意思是将其转移到旧学校 C/Fortran
,这样不会有太大帮助。
应用程序
实际上,我感兴趣的应用程序是应用高斯滤波器:a_i = 0.5 * a_i + 0.25 * (a_{i-1} + a_{i+1})
到数组 N
次,而无需执行 N
遍,也不必复制整个数组.
那么你可以做的是,你可以将对角线上 0.5
和伪对角线 0.25
的稀疏矩阵提高到 N
次方,你将结束得到权重向量和一个看起来像我之前展示的矩阵(但具有 N
非零伪对角线)。然后你可以使用上面的方法将这些权重应用于数组,这样你就不必在必须将它用于其他组件之前修改 a_i
,但同时不用复制整个数组就可以离开。
PS。不幸的是,我没有完全理解 @grodzi 的回答。多一点解释肯定会有帮助。
我有一个一维数组(大小为 M
的向量),一个相当大的数组,我绝对不想在内存中复制它。我还有一个windowN
的稀疏矩阵(任意大小,基本上除了对角线&N
伪对角线之外的所有元素都是零)。
我想用向量乘以这个稀疏矩阵,而不必在内存中复制向量。最简单、最有效的方法是什么?必须有一个巧妙的解决方案,但我不知道合适的文献,而且我没有受过足够的教育来解决这个问题。
N=1
有一个解决方案(其中矩阵是:a
在对角线上,b
在两个最接近的伪对角线上)。解决方案看起来像这样(例如,在 python 中):
tmp2 = array[0]
i = 1
while (i < len(array) - 1):
tmp1 = b * (array[i - 1] + array[i + 1]) + a * array[i]
array[i - 1] = tmp2
i += 1
tmp2 = b * (array[i - 1] + array[i + 1]) + a * array[i]
array[i - 1] = tmp1
i += 1
但我无法将其概括为任意 N
。
注释:我绝对不想在内存中复制大小 M
向量。但是,使用大小为 2N+1
的临时数组是可以的,因为 M >> N
。我正在寻找一个实际的算法描述,而不是一个智能的自定义库来完成这项工作。
提前致谢!
考虑矩阵
[
1 2 3 0 0 0
2 1 2 4 0 0
3 2 1 2 5 0
0 7 2 1 2 6
0 0 8 2 1 2
0 0 0 9 2 1
]
和向量 v [1,2,3,4,5,6]
对于每一行,在v
的涉及系数下方:
[1,2,3]
[1,2,3,4]
[1,2,3,4,5]
[ 2,3,4,5,6]
[ 3,4,5,6]
[ 4,5,6]
如您所见,您只需要跟踪 v
个 window。
那个window原来是[1,2,3,4,5]
(for i = 0,1,2)
然后每隔 i
将 window 向右移动一次(并最终将其截断以使最后一行不超出 v
的范围...)
现在请注意,当您向右移动时,您只需要知道 v
的下一个值,只要您没有弄脏该值(通过写入 v
)您的新 window 有效。
第i
行,window为[i-n;i+n]
,将修改的系数为v[i]
。对于下一个window,你需要知道v[i+n+1]
哪个还没有被弄脏。一切都很好。
所以算法就像
window = circularbuffer(2n+1) //you push to the right, and if length > 2n+1, you remove first elem
for i = 0; i<v.size()
v[i] = prod(row_i, window) // only for the row_i coeffs...
if i >= n && < M-3
window.push(v[i+n+1])
else if i>= M-3
window.shift() // just remove the first value
const N = 2
const M_SIZE = 10
function toString(M){
return M.map(x=>x.join(' ')).join('\n')
}
const { M, v } = (_ => {
const M = Array(M_SIZE).fill(0).map(x=>Array(M_SIZE).fill(0))
let z = 1
for(let i = 0; i<M_SIZE; ++i){
for(let j = -N; j<=N; ++j){
if(i+j >= 0 && i+j <M_SIZE){
M[i][i+j] = (z++ % (N*2))+1
}
}
}
const v = Array(M.length).fill(0).map((x,i)=>i)
return { M, v}
})()
function classic(M, v){
return M.map(r => r.reduce((acc, x, j) => acc + v[j]*x, 0))
}
function inplace(M, v){
// captn inefficiency
const circBuf = (init => {
let buf = init
return {
push (x) {
buf.push(x)
buf.shift()
},
shift() {
buf.shift()
},
at (i) { return buf[i] },
toString() {
return buf.join(' ')
}
}
})(v.slice(0, 2 * N + 1))
const sparseProd = (row, buf) => {
let s = 0
row.forEach((x, j) => s += x * buf.at(j))
return s
}
const sparseRows = M.map(r => r.filter(x => x !== 0))
sparseRows.forEach((row, i) => {
v[i] = sparseProd(row, circBuf)
if (i >= sparseRows.length - 3 ) {
circBuf.shift()
} else {
if (i >= N) {
circBuf.push(v[i + N + 1])
}
}
})
}
console.log('classic prod', classic(M, v))
inplace(M, v)
console.log('inplace prod', v)
所以我最终做了这样的事情。这似乎是对 N=1 情况所做的概括。
一般来说,我的权重基本上是稀疏矩阵中中心行的非零分量。 IE。如果矩阵看起来像这样(如评论中所述,它通常是对称的,但不一定):
| a b c 0 0 ... 0 0 0 0 0 |
| b a b c 0 ... 0 0 0 0 0 |
| c b a b c ... 0 0 0 0 0 |
| 0 c b a b ... 0 0 0 0 0 |
| 0 0 c b a ... 0 0 0 0 0 |
| ... ... ... |
| 0 0 0 0 0 ... a b c 0 0 |
| 0 0 0 0 0 ... b a b c 0 |
| 0 0 0 0 0 ... c b a b c |
| 0 0 0 0 0 ... 0 c b a b |
| 0 0 0 0 0 ... 0 0 c b a |
那么权重向量就是[c, b, a, b, c]
(即N = 2
)。
所以对于 N = ntimes
我结束做这样的事情的一般情况:
def sparse_multiply(array, weights):
ntimes = (len(weights) - 1) / 2
# reduced dot product
def product(a_, i_, w_):
dot = 0.0
for k, j in enumerate(range(i_ - ntimes, i_ + ntimes + 1)):
if (j >= 0 and j < len(a_)):
dot += a_[j] * w_[k]
return dot
tmp = np.zeros(ntimes + 1)
for i in range(ntimes):
tmp[i] = array[i]
i = ntimes
while (i <= len(array)):
for t in range(-1, ntimes):
tmp[t] = product(array, i, w)
array[i - ntimes] = tmp[t + 1]
i += 1
return array
您所做的唯一牺牲是大小为 O(N
) 的临时数组,这很好,因为正如我所说,N << M
.
是的,是的,我知道一些操作(比如减少点积)可以用一些 python 魔法来完成。但我的意思是将其转移到旧学校 C/Fortran
,这样不会有太大帮助。
应用程序
实际上,我感兴趣的应用程序是应用高斯滤波器:a_i = 0.5 * a_i + 0.25 * (a_{i-1} + a_{i+1})
到数组 N
次,而无需执行 N
遍,也不必复制整个数组.
那么你可以做的是,你可以将对角线上 0.5
和伪对角线 0.25
的稀疏矩阵提高到 N
次方,你将结束得到权重向量和一个看起来像我之前展示的矩阵(但具有 N
非零伪对角线)。然后你可以使用上面的方法将这些权重应用于数组,这样你就不必在必须将它用于其他组件之前修改 a_i
,但同时不用复制整个数组就可以离开。
PS。不幸的是,我没有完全理解 @grodzi 的回答。多一点解释肯定会有帮助。