高效构建FEM/FVM矩阵
Efficiently construct FEM/FVM matrix
这是 FEM/FVM 方程系统的典型用例,因此可能具有更广泛的兴趣。来自三角形网格 à la
我想创建一个scipy.sparse.csr_matrix
。矩阵 rows/columns 表示网格节点处的值。该矩阵在主对角线上以及两个节点通过边连接的地方都有条目。
这是一个 MWE,它首先建立节点->边缘->单元关系,然后建立矩阵:
import numpy
import meshzoo
from scipy import sparse
nx = 1600
ny = 1000
verts, cells = meshzoo.rectangle(0.0, 1.61, 0.0, 1.0, nx, ny)
n = len(verts)
nds = cells.T
nodes_edge_cells = numpy.stack([nds[[1, 2]], nds[[2, 0]],nds[[0, 1]]], axis=1)
# assign values to each edge (per cell)
alpha = numpy.random.rand(3, len(cells))
vals = numpy.array([
[alpha**2, -alpha],
[-alpha, alpha**2],
])
# Build I, J, V entries for COO matrix
I = []
J = []
V = []
#
V.append(vals[0][0])
V.append(vals[0][1])
V.append(vals[1][0])
V.append(vals[1][1])
#
I.append(nodes_edge_cells[0])
I.append(nodes_edge_cells[0])
I.append(nodes_edge_cells[1])
I.append(nodes_edge_cells[1])
#
J.append(nodes_edge_cells[0])
J.append(nodes_edge_cells[1])
J.append(nodes_edge_cells[0])
J.append(nodes_edge_cells[1])
# Create suitable data for coo_matrix
I = numpy.concatenate(I).flat
J = numpy.concatenate(J).flat
V = numpy.concatenate(V).flat
matrix = sparse.coo_matrix((V, (I, J)), shape=(n, n))
matrix = matrix.tocsr()
与
python -m cProfile -o profile.prof main.py
snakeviz profile.prof
可以创建和查看以上配置文件:
方法 tocsr()
在这里占用了大部分的运行时间,但是当构建 alpha
更复杂时也是如此。因此,我正在寻找加快速度的方法。
我已经发现:
由于数据的结构,矩阵对角线上的值可以提前求和,即
V.append(vals[0, 0, 0] + vals[1, 1, 2])
I.append(nodes_edge_cells[0, 0]) # == nodes_edge_cells[1, 2]
J.append(nodes_edge_cells[0, 0]) # == nodes_edge_cells[1, 2]
这使得 I
、J
、V
更短,从而加快了 tocsr
。
现在,边缘是 "per cell"。我可以使用 numpy.unique
识别彼此相等的边,有效地节省了大约一半的 I
、J
、V
。但是,我发现这也需要一些时间。 (不足为奇。)
我的另一个想法是,如果有 csr_matrix
-like 数据结构,其中主对角线分开保存。我知道这存在于其他一些软件包中,但在 scipy 中找不到。正确吗?
或许有一种直接构建CSR的明智方法?
我会尝试直接创建 csr 结构,特别是如果您要求助于 np.unique
,因为这会为您提供排序的键,这是完成工作的一半。
我假设你正处于 i, j
按字典顺序排序并重叠 v
使用 np.add.at
对 [= 的可选 inverse
输出求和的位置12=].
那么v
和j
已经是csr格式了。剩下要做的就是创建 indptr
,您只需通过 np.searchsorted(i, np.arange(M+1))
获得,其中 M
是列长度。您可以将这些直接传递给 sparse.csr_matrix
构造函数。
好了,让代码说话:
import numpy as np
from scipy import sparse
from timeit import timeit
def tocsr(I, J, E, N):
n = len(I)
K = np.empty((n,), dtype=np.int64)
K.view(np.int32).reshape(n, 2).T[...] = J, I
S = np.argsort(K)
KS = K[S]
steps = np.flatnonzero(np.r_[1, np.diff(KS)])
ED = np.add.reduceat(E[S], steps)
JD, ID = KS[steps].view(np.int32).reshape(-1, 2).T
ID = np.searchsorted(ID, np.arange(N+1))
return sparse.csr_matrix((ED, np.array(JD, dtype=int), ID), (N, N))
def viacoo(I, J, E, N):
return sparse.coo_matrix((E, (I, J)), (N, N)).tocsr()
#testing and timing
# correctness
N = 1000
A = np.random.random((N, N)) < 0.001
I, J = np.where(A)
E = np.random.random((2, len(I)))
D = np.zeros((2,) + A.shape)
D[:, I, J] = E
D2 = tocsr(np.r_[I, I], np.r_[J, J], E.ravel(), N).A
print('correct:', np.allclose(D.sum(axis=0), D2))
# speed
N = 100000
K = 10
I, J = np.random.randint(0, N, (2, K*N))
E = np.random.random((2 * len(I),))
I, J, E = np.r_[I, I, J, J], np.r_[J, J, I, I], np.r_[E, E]
print('N:', N, ' -- nnz (with duplicates):', len(E))
print('direct: ', timeit('f(a,b,c,d)', number=10, globals={'f': tocsr, 'a': I, 'b': J, 'c': E, 'd': N}), 'secs for 10 iterations')
print('via coo:', timeit('f(a,b,c,d)', number=10, globals={'f': viacoo, 'a': I, 'b': J, 'c': E, 'd': N}), 'secs for 10 iterations')
打印:
correct: True
N: 100000 -- nnz (with duplicates): 4000000
direct: 7.702431229001377 secs for 10 iterations
via coo: 41.813509466010146 secs for 10 iterations
加速:5 倍
所以,最后证明这是 COO 和 CSR 之间的区别 sum_duplicates
(就像@hpaulj 怀疑的那样)。感谢这里涉及的每个人(特别是@paul-panzer)的努力,a PR 正在为 tocsr
提供巨大的加速。
SciPy 的 tocsr
在 (I, J)
上做了一个 lexsort
,所以它有助于以 (I, J)
出来的方式组织索引已经很好地排序了。
对于上例中的nx=4
、ny=2
,I
和J
是
[1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7]
[1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7 1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7]
首先对 cells
的每一行进行排序,然后按第一列对行进行排序
cells = numpy.sort(cells, axis=1)
cells = cells[cells[:, 0].argsort()]
产生
[1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6]
[1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6 1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6]
对于原始 post 中的数字,排序将运行时间从大约 40 秒减少到 8 秒。
如果首先对节点进行更适当的编号,也许可以实现更好的排序。我在考虑 Cuthill-McKee and friends。
这是 FEM/FVM 方程系统的典型用例,因此可能具有更广泛的兴趣。来自三角形网格 à la
我想创建一个scipy.sparse.csr_matrix
。矩阵 rows/columns 表示网格节点处的值。该矩阵在主对角线上以及两个节点通过边连接的地方都有条目。
这是一个 MWE,它首先建立节点->边缘->单元关系,然后建立矩阵:
import numpy
import meshzoo
from scipy import sparse
nx = 1600
ny = 1000
verts, cells = meshzoo.rectangle(0.0, 1.61, 0.0, 1.0, nx, ny)
n = len(verts)
nds = cells.T
nodes_edge_cells = numpy.stack([nds[[1, 2]], nds[[2, 0]],nds[[0, 1]]], axis=1)
# assign values to each edge (per cell)
alpha = numpy.random.rand(3, len(cells))
vals = numpy.array([
[alpha**2, -alpha],
[-alpha, alpha**2],
])
# Build I, J, V entries for COO matrix
I = []
J = []
V = []
#
V.append(vals[0][0])
V.append(vals[0][1])
V.append(vals[1][0])
V.append(vals[1][1])
#
I.append(nodes_edge_cells[0])
I.append(nodes_edge_cells[0])
I.append(nodes_edge_cells[1])
I.append(nodes_edge_cells[1])
#
J.append(nodes_edge_cells[0])
J.append(nodes_edge_cells[1])
J.append(nodes_edge_cells[0])
J.append(nodes_edge_cells[1])
# Create suitable data for coo_matrix
I = numpy.concatenate(I).flat
J = numpy.concatenate(J).flat
V = numpy.concatenate(V).flat
matrix = sparse.coo_matrix((V, (I, J)), shape=(n, n))
matrix = matrix.tocsr()
与
python -m cProfile -o profile.prof main.py
snakeviz profile.prof
可以创建和查看以上配置文件:
方法 tocsr()
在这里占用了大部分的运行时间,但是当构建 alpha
更复杂时也是如此。因此,我正在寻找加快速度的方法。
我已经发现:
由于数据的结构,矩阵对角线上的值可以提前求和,即
V.append(vals[0, 0, 0] + vals[1, 1, 2]) I.append(nodes_edge_cells[0, 0]) # == nodes_edge_cells[1, 2] J.append(nodes_edge_cells[0, 0]) # == nodes_edge_cells[1, 2]
这使得
I
、J
、V
更短,从而加快了tocsr
。现在,边缘是 "per cell"。我可以使用
numpy.unique
识别彼此相等的边,有效地节省了大约一半的I
、J
、V
。但是,我发现这也需要一些时间。 (不足为奇。)
我的另一个想法是,如果有 csr_matrix
-like 数据结构,其中主对角线分开保存。我知道这存在于其他一些软件包中,但在 scipy 中找不到。正确吗?
或许有一种直接构建CSR的明智方法?
我会尝试直接创建 csr 结构,特别是如果您要求助于 np.unique
,因为这会为您提供排序的键,这是完成工作的一半。
我假设你正处于 i, j
按字典顺序排序并重叠 v
使用 np.add.at
对 [= 的可选 inverse
输出求和的位置12=].
那么v
和j
已经是csr格式了。剩下要做的就是创建 indptr
,您只需通过 np.searchsorted(i, np.arange(M+1))
获得,其中 M
是列长度。您可以将这些直接传递给 sparse.csr_matrix
构造函数。
好了,让代码说话:
import numpy as np
from scipy import sparse
from timeit import timeit
def tocsr(I, J, E, N):
n = len(I)
K = np.empty((n,), dtype=np.int64)
K.view(np.int32).reshape(n, 2).T[...] = J, I
S = np.argsort(K)
KS = K[S]
steps = np.flatnonzero(np.r_[1, np.diff(KS)])
ED = np.add.reduceat(E[S], steps)
JD, ID = KS[steps].view(np.int32).reshape(-1, 2).T
ID = np.searchsorted(ID, np.arange(N+1))
return sparse.csr_matrix((ED, np.array(JD, dtype=int), ID), (N, N))
def viacoo(I, J, E, N):
return sparse.coo_matrix((E, (I, J)), (N, N)).tocsr()
#testing and timing
# correctness
N = 1000
A = np.random.random((N, N)) < 0.001
I, J = np.where(A)
E = np.random.random((2, len(I)))
D = np.zeros((2,) + A.shape)
D[:, I, J] = E
D2 = tocsr(np.r_[I, I], np.r_[J, J], E.ravel(), N).A
print('correct:', np.allclose(D.sum(axis=0), D2))
# speed
N = 100000
K = 10
I, J = np.random.randint(0, N, (2, K*N))
E = np.random.random((2 * len(I),))
I, J, E = np.r_[I, I, J, J], np.r_[J, J, I, I], np.r_[E, E]
print('N:', N, ' -- nnz (with duplicates):', len(E))
print('direct: ', timeit('f(a,b,c,d)', number=10, globals={'f': tocsr, 'a': I, 'b': J, 'c': E, 'd': N}), 'secs for 10 iterations')
print('via coo:', timeit('f(a,b,c,d)', number=10, globals={'f': viacoo, 'a': I, 'b': J, 'c': E, 'd': N}), 'secs for 10 iterations')
打印:
correct: True
N: 100000 -- nnz (with duplicates): 4000000
direct: 7.702431229001377 secs for 10 iterations
via coo: 41.813509466010146 secs for 10 iterations
加速:5 倍
所以,最后证明这是 COO 和 CSR 之间的区别 sum_duplicates
(就像@hpaulj 怀疑的那样)。感谢这里涉及的每个人(特别是@paul-panzer)的努力,a PR 正在为 tocsr
提供巨大的加速。
SciPy 的 tocsr
在 (I, J)
上做了一个 lexsort
,所以它有助于以 (I, J)
出来的方式组织索引已经很好地排序了。
对于上例中的nx=4
、ny=2
,I
和J
是
[1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7]
[1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7 1 6 3 5 2 7 5 5 7 4 5 6 0 2 2 0 1 2 5 5 7 4 5 6 0 2 2 0 1 2 1 6 3 5 2 7]
首先对 cells
的每一行进行排序,然后按第一列对行进行排序
cells = numpy.sort(cells, axis=1)
cells = cells[cells[:, 0].argsort()]
产生
[1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6]
[1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6 1 4 2 5 3 6 5 5 5 6 7 7 0 0 1 2 2 2 5 5 5 6 7 7 0 0 1 2 2 2 1 4 2 5 3 6]
对于原始 post 中的数字,排序将运行时间从大约 40 秒减少到 8 秒。
如果首先对节点进行更适当的编号,也许可以实现更好的排序。我在考虑 Cuthill-McKee and friends。