如何在 Fortran mex 文件中实现 A = sparse(I, J, K) (来自三元组的稀疏矩阵)?

How to implement A = sparse(I, J, K) (sparse matrix from triplet) in a Fortran mex file?

我正在尝试通过 mex 函数(用 Fortran 编写)在 Matlab 中创建一个稀疏方阵。我想要 A = sparse(I,J,K) 之类的东西。我的三胞胎是这样的,条目之间有重复

femi = [1, 2, 3, 2, 2, 4, 5, 5, 4, 6, 6, 5, 5, 2]
femj = [2, 2, 1, 1, 1, 3, 3, 6, 3, 1, 1, 2, 2, 4]
femk = [2, 1, 5, 4, 2, 4, 5, 7, 2, 1, 6, 2, 1, 4]

我写了一段粗略的代码,它适用于小矩阵维度,但它比内在的 Matlab sparse 慢得多。由于我几乎没有编码背景,所以我不知道自己做错了什么(分配变量的方式错误?循环太多?)。任何帮助表示赞赏。谢谢你。这是 mex 计算子例程。它returns pr, ir, jc 索引数组给稀疏矩阵

subroutine new_sparse(femi, femj, femk, pr, ir, jc, n, m)

      implicit none
      intrinsic:: SUM, COUNT, ANY
      integer :: i, j, k, n, indjc, m
      real*8  :: femi(n), femj(n), femk(n)
      real*8  :: pr(n)
      integer :: ir(n),jc(m+1)
      logical :: indices(n)

      indices = .false.
      k = 1
      indjc = 0
      jc(1) = 0
      do j=1,m
            do i =1,m
                indices = [femi==i .and. femj==j]
                if (ANY(indices .eqv. .true.)) then                                     
                    ir(k) = i-1
                    pr(k) = SUM(femk, indices)
                    k = k+1
                    indjc = indjc + 1
                end if
            end do
            if (indjc/=0) then
                jc(j+1) = jc(j) + indjc
                indjc = 0
            else
                jc(j+1) = jc(j) 
            end if
      end do


      return      
      end

编辑:
正如用户@jack 和@veryreverie 在下面的评论中所建议的,最好直接排序femifemjfemk。我猜想 ranking/sorting femi 首先(并根据 femifemjfemk 进行排序)然后 ranking/sorting femj (并且根据 femj) 对 femifemk 进行排序提供了所需的结果。剩下的就是处理重复了。

编辑#2:
我逐行翻译了Engblom and Lukarksi 的C代码序列化版本。这篇文档把他们的推理解释得很清楚,我觉得对我这样的初学者很有用。但是,由于我的经验不足,我无法翻译并行版本的代码。也许这会引发另一个问题。

    subroutine new_sparse(ir, jcS, pr, MatI, MatJ, MatK, n, m)

! use omp_lib

implicit none
integer,  parameter   :: dp = selected_real_kind(15,300)
integer,  intent(in)  :: n, m 
real(dp), intent(in)  :: MatK(n),     MatI(n),     MatJ(n)
! integer*8,  intent(out) :: nnew
integer ::  i, k, col, row, c, r !, nthreads
integer ::  hcol(m+1), jcS(m+1), jrS(m+1)
integer ::  ixijs, irank(n), rank(n)
real*8  ::  pr(*)
integer ::  ir(*)

hcol = 0
jcS  = 0
jrS  = 0
    
do i = 1,n
    jrS(MatI(i)+1) = jrS(MatI(i)+1)+1
end do
do r = 2,m+1
    jrS(r) = jrS(r) + jrS(r-1)
end do

do  i = 1,n 
     rank(jrS(MatI(i))+1) = i
     jrS(MatI(i)) = jrS(MatI(i)) + 1
end do

k = 1
do row = 1,m
    do i = k , jrS(row) 
        ixijs = rank(i)
        col = MatJ(ixijs)
        if (hcol(col) < row) then
            hcol(col) = row
            jcS(col+1) = jcS(col+1)+1
        end if
        irank(ixijs) = jcS(col+1)
        k = k+1
    end do
end do

do c = 2,m+1  
    jcS(c) = jcS(c) + jcS(c-1)
end do

do i = 1,n
    irank(i) = irank(i) + jcS(MatJ(i))
end do    

ir(irank) = MatI-1
do i = 1,n
    pr(irank(i)) = pr(irank(i)) +  MatK(i)
end do

return    
end

这应该有效:

module test
  implicit none
  
  ! This should probably be whatever floating point format Matlab uses.
  integer, parameter :: dp = selected_real_kind(15,300)
contains
subroutine new_sparse(femi, femj, femk, pr, ir, jc, n, m)
  integer, intent(in) :: n ! The size of femi, femj, femk.
  integer, intent(in) :: m ! The no. of rows (and cols) in the matrix.
  integer, intent(in) :: femi(n) ! The input i indices.
  integer, intent(in) :: femj(n) ! The input j indices.
  real(dp), intent(in) :: femk(n) ! The input values.
  real(dp), intent(out) :: pr(n) ! The output values.
  integer, intent(out) :: ir(n) ! The output i indices.
  integer, intent(out) :: jc(m+1) ! Column j has jc(j+1)-jc(j) non-zero entries
  
  ! loop indices.
  integer :: a,b
  
  ! Initialise jc.
  ! All elements of `jc` are `1` as the output initially contains no elements.
  jc = 1
  
  ! Loop over the input elements.
  do_a : do a=1,n
    associate(i=>femi(a), j=>femj(a), k=>femk(a))
      ! Loop over the stored entries in column j of the output,
      !    looking for element (i,j).
      do b=jc(j),jc(j+1)-1
        ! Element (i,j) is already in the output, update the output and cycle.
        if (ir(b)==i) then
          pr(b) = pr(b) + femk(a)
          cycle do_a
        endif
      enddo
      
      ! Element (i,j) is not already in the output.
      ! First make room for the new element in ir and pr,
      !    then add the element to ir and pr,
      !    then update jc.
      ir(jc(j+1)+1:jc(m+1)) = ir(jc(j+1):jc(m+1)-1)
      pr(jc(j+1)+1:jc(m+1)) = pr(jc(j+1):jc(m+1)-1)
      ir(jc(j+1)) = i
      pr(jc(j+1)) = k
      jc(j+1:) = jc(j+1:) + 1
    end associate
  enddo do_a
end subroutine
end module

program prog
  use test
  implicit none
  
  integer, parameter :: n = 14
  integer, parameter :: m = 6
  
  integer  :: femi(n), femj(n)
  real(dp) :: femk(n)
  
  real(dp) :: pr(n)
  integer  :: ir(n),jc(m+1)
  
  integer :: a,b
  
  femi = [1, 2, 3, 2, 2, 4, 5, 5, 4, 6, 6, 5, 5, 2]
  femj = [2, 2, 1, 1, 1, 3, 3, 6, 3, 1, 1, 2, 2, 4]
  femk = real([2, 1, 5, 4, 2, 4, 5, 7, 2, 1, 6, 2, 1, 4], dp)
  
  write(*,*) 'Input:'
  do a=1,n
    write(*,'(a,i0,a,i0,a,f2.0)') '(',femi(a),',',femj(a),') : ',femk(a)
  enddo
  
  write(*,*)
  
  call new_sparse(femi,femj,femk,pr,ir,jc,n,m)
  
  write(*,*) 'Output:'
  do a=1,m
    do b=jc(a),jc(a+1)-1
      write(*,'(a,i0,a,i0,a,f2.0)') '(',ir(b),',',a,') : ',pr(b)
    enddo
  enddo
end program

这里写道:

Input:
(1,2) : 2.
(2,2) : 1.
(3,1) : 5.
(2,1) : 4.
(2,1) : 2.
(4,3) : 4.
(5,3) : 5.
(5,6) : 7.
(4,3) : 2.
(6,1) : 1.
(6,1) : 6.
(5,2) : 2.
(5,2) : 1.
(2,4) : 4.

Output:
(3,1) : 5.
(2,1) : 6.
(6,1) : 7.
(1,2) : 2.
(2,2) : 1.
(5,2) : 3.
(4,3) : 6.
(5,3) : 5.
(2,4) : 4.
(5,6) : 7.

您算法的瓶颈来自指令 indices = [femi==i .and. femj==j]any(indices .eqv. .true.)sum(femk, indices)。这些都需要 O(n) 操作,并且由于它们在双循环中,因此子例程的总成本是 O(m^2*n).

我的算法分两个阶段进行。第一阶段,do b=jc(j),jc(j+1)-1 循环,将输入中的每个元素与输出的匹配列中的每个元素进行比较,最大成本为 O(mn) 操作。如果在输出中找到输入元素,则更新该值,无需执行任何其他操作。

如果在输出中找不到输入元素,则需要将其添加到输出中。这是由第二阶段处理的,即 do b... 循环之后的代码。由于这需要移动输出元素以便为新元素生成 space,因此此阶段最多有 O(n'^2) 操作,其中 n' 是输入中唯一元素的数量,对于稀疏矩阵应该满足 n'<=nn'<<m^2

对于大型 mn,我的算法应该 运行 快很多,但它肯定有很大的改进空间。我怀疑使用中间数据结构来存储 irpr 是值得的,这样就可以插入新元素而不必重新排列所有元素。