Rcpp Parallel 或 openmp for matrixvector product

Rcpp Parallel or openmp for matrixvector product

我正在尝试编写共轭梯度的朴素并行版本,所以我从简单的维基百科算法开始,我想通过适当的并行版本更改 dot-productsMatrixVector 产品, Rcppparallel 文档中有 dot-product using parallelReduce 的代码;我想我会为我的代码使用那个版本,但我正在尝试进行 MatrixVector 乘法运算,但与 R base(无并行)

相比,我没有取得好的结果

并行矩阵乘法的一些版本:使用 OpenMP、Rcppparallel、串行版本、带有犰狳的串行版本和基准测试

// [[Rcpp::depends(RcppParallel)]]
#include <Rcpp.h>
#include <RcppParallel.h>
#include <numeric>
// #include <cstddef>
// #include <cstdio>
#include <iostream>
using namespace RcppParallel;
using namespace Rcpp;

struct InnerProduct : public Worker
{   
   // source vectors
   const RVector<double> x;
   const RVector<double> y;

   // product that I have accumulated
   double product;

   // constructors
   InnerProduct(const NumericVector x, const NumericVector y) 
      : x(x), y(y), product(0) {}
   InnerProduct(const InnerProduct& innerProduct, Split) 
      : x(innerProduct.x), y(innerProduct.y), product(0) {}

   // process just the elements of the range I've been asked to
   void operator()(std::size_t begin, std::size_t end) {
      product += std::inner_product(x.begin() + begin, 
                                    x.begin() + end, 
                                    y.begin() + begin, 
                                    0.0);
   }

   // join my value with that of another InnerProduct
   void join(const InnerProduct& rhs) { 
     product += rhs.product; 
   }
};

struct MatrixMultiplication : public Worker
{
   // source matrix
   const RMatrix<double> A;

    //source vector
   const RVector<double> x;

   // destination matrix
   RMatrix<double> out;

   // initialize with source and destination
   MatrixMultiplication(const NumericMatrix A, const NumericVector x, NumericMatrix out) 
     : A(A), x(x), out(out) {}

   // take the square root of the range of elements requested
   void operator()(std::size_t begin, std::size_t end) {
      for (std::size_t i = begin; i < end; i++) {
            // rows we will operate on
            //RMatrix<double>::Row rowi = A.row(i);
            RMatrix<double>::Row rowi = A.row(i);

            //double res = std::inner_product(rowi.begin(), rowi.end(), x.begin(), 0.0);
            //Rcout << "res" << res << std::endl;
            out(i,1) = std::inner_product(rowi.begin(), rowi.end(), x.begin(), 0.0);
            //Rcout << "res" << out(i,1) << std::endl;
      }
    }  
};

// [[Rcpp::export]]
double parallelInnerProduct(NumericVector x, NumericVector y) {

   // declare the InnerProduct instance that takes a pointer to the vector data
   InnerProduct innerProduct(x, y);

   // call paralleReduce to start the work
   parallelReduce(0, x.length(), innerProduct);

   // return the computed product
   return innerProduct.product;
}
//librar(Rbenchmark)

// [[Rcpp::export]]
NumericVector matrixXvectorRcppParallel(NumericMatrix A, NumericVector x) {

   // // declare the InnerProduct instance that takes a pointer to the vector data
   // InnerProduct innerProduct(x, y);
   int nrows = A.nrow();
   NumericVector out(nrows);
   for(int i = 0; i< nrows;i++ )
   {
      out(i) = parallelInnerProduct(A(i,_),x);
   }
   // return the computed product
   return out;
}

// [[Rcpp::export]]
arma::rowvec matrixXvectorParallel(arma::mat A, arma::colvec x){
    arma::rowvec y = A.row(0)*0;
    int filas = A.n_rows;
    int columnas = A.n_cols;
    #pragma omp parallel for
    for(int j=0;j<columnas;j++)
    {
        //y(j) = A.row(j)*x(j))
        y(j) = dotproduct(A.row(j),x);
    }
    return y;
} 

arma::mat matrixXvector2(arma::mat A, arma::mat x){
  //arma::rowvec y = A.row(0)*0;
  //y=A*x;
  return A*x;
}

arma::rowvec matrixXvectorParallel2(arma::mat A, arma::colvec x){
    arma::rowvec y = A.row(0)*0;
    int filas = A.n_rows;
    int columnas = A.n_cols;

 #pragma omp parallel for
    for(int j = 0; j < columnas ; j++){
        double result = 0;
        for(int i = 0; i < filas; i++){
                result += x(i)*A(j,i);   
        }
        y(j) = result;
    }
    return y;
}

基准

                             test replications elapsed relative user.self sys.self user.child sys.child
1                         M %*% a           20   0.026    1.000     0.140    0.060          0         0
2 matrixXvector2(M, as.matrix(a))           20   0.040    1.538     0.101    0.217          0         0
4    matrixXvectorParallel2(M, a)           20   0.063    2.423     0.481    0.000          0         0
3     matrixXvectorParallel(M, a)           20   0.146    5.615     0.745    0.398          0         0
5 matrixXvectorRcppParallel(M, a)           20   0.335   12.885     2.305    0.079          0         0

我目前的最后一次尝试是将 parallefor 与 Rcppparallel 一起使用,但我遇到了内存错误,我不知道问题出在哪里

// [[Rcpp::export]]
NumericVector matrixXvectorRcppParallel2(NumericMatrix A, NumericVector x) {

   // // declare the InnerProduct instance that takes a pointer to the vector data
   int nrows = A.nrow();
   NumericMatrix out(nrows,1); //allocar mempria de vector de salida
   //crear worker
   MatrixMultiplication matrixMultiplication(A, x, out);


   parallelFor(0,A.nrow(),matrixMultiplication);

   // return the computed product
   return out;
}    

我注意到,当我使用 htop 检查我的终端时,处理器是如何工作的,当我使用 R-base 应用传统的矩阵向量乘法时,我在 htop 中看到,它使用了所有处理器,所以矩阵乘法默认并行执行?因为理论上,如果是串行版本,应该只有一个处理器可以工作。

如果有人知道哪个是更好的路径,OpenMP 或 Rcppparallel,或其他方式,那会给我比 R-base 的明显串行版本更好的性能。

此时共轭梯度序列号

// [[Rcpp::export]]
arma::colvec ConjugateGradient(arma::mat A, arma::colvec xini, arma::colvec b, int num_iteraciones){
    //arma::colvec xnew = xini*0 //inicializar en 0's
    arma::colvec x= xini; //inicializar en 0's
    arma::colvec rkold = b - A*xini;
    arma::colvec rknew = b*0;
    arma::colvec pk = rkold;
    int k=0;
    double alpha_k=0;
    double betak=0;
    double normak = 0.0;

    for(k=0; k<num_iteraciones;k++){
         Rcout << "iteracion numero " << k << std::endl;
        alpha_k =  sum(rkold.t() * rkold) / sum(pk.t()*A*pk); //sum de un elemento para realizar casting
        (pk.t()*A*pk);
        x = x+ alpha_k * pk;
        rknew = rkold - alpha_k*A*pk;
        normak =  sum(rknew.t()*rknew);
        if( normak < 0.000001){
            break;
        }
        betak = sum(rknew.t()*rknew) / sum( rkold.t() * rkold );

        //actualizar valores para siguiente iteracion
        pk = rknew + betak*pk;
        rkold = rknew;

    }

    return x;

}

我不知道在 R 中使用 BLAS,感谢 Hong Ooi 和 tim18,所以新的基准使用选项(matprod="internal")和选项(matprod="blas")

options(matprod = "internal")
res<-benchmark(M%*%a,matrixXvector2(M,as.matrix(a)),matrixXvectorParallel(M,a),matrixXvectorParallel2(M,a),matrixXvectorRcppParallel(M,a),order="relative",replications = 20)
res

                             test replications elapsed relative user.self sys.self user.child sys.child
2 matrixXvector2(M, as.matrix(a))           20   0.043    1.000     0.107    0.228          0         0
4    matrixXvectorParallel2(M, a)           20   0.069    1.605     0.530    0.000          0         0
1                         M %*% a           20   0.072    1.674     0.071    0.000          0         0
3     matrixXvectorParallel(M, a)           20   0.140    3.256     0.746    0.346          0         0
5 matrixXvectorRcppParallel(M, a)           20   0.343    7.977     2.272    0.175          0         0

选项(matprod="blas")

options(matprod = "blas")

res<-benchmark(M%*%a,matrixXvector2(M,as.matrix(a)),matrixXvectorParallel(M,a),matrixXvectorParallel2(M,a),matrixXvectorRcppParallel(M,a),order="relative",replications = 20)
res
                             test replications elapsed relative user.self sys.self user.child sys.child
1                         M %*% a           20   0.021    1.000     0.093    0.054          0         0
2 matrixXvector2(M, as.matrix(a))           20   0.092    4.381     0.177    0.464          0         0
5 matrixXvectorRcppParallel(M, a)           20   0.328   15.619     2.143    0.109          0         0
4    matrixXvectorParallel2(M, a)           20   0.438   20.857     3.036    0.000          0         0
3     matrixXvectorParallel(M, a)           20   0.546   26.000     3.667    0.127          0         0

如您所知,如果使用 multi-threaded BLAS 实现,基本 R 矩阵乘法可以是 multi-threaded。 rocker/* docker 图像就是这种情况,通常使用 OpenBLAS。

此外,(Rcpp)Armadillo 已经使用了 R 使用的 BLAS 库(在本例中 multi-threaded OpenBLAS)以及 OpenMP。所以你的 "serial" 版本实际上是 multi-threaded。您可以在 htop 中使用足够大的矩阵作为输入来验证这一点。

顺便说一句,在我看来,你正在尝试做的事情 premature optimization