使用 OpenMP atomic 并行更新矩阵列

Parallel update of matrix columns using OpenMP atomic

我有一小段代码,基本上可以执行简单的收集操作,我正在尝试使用 OpenMP 使其成为多线程:

Eigen::MatrixXf methodA(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
  assert(offset.size() == weight.size());
  Eigen::MatrixXf values(in.rows(), 2000);

  #pragma omp parallel for
  for (int j = 0; j < in.cols(); ++j) {

    for (int i = 0; i < offset.rows(); ++i) {
      int o = offset(i, j); // guaranteed to index between 0 - in.cols()
      float w = weight(i, j);

      // Update column o i.e. values.col(o) += w * in.col(j);
      for (int k = 0; k < in.rows(); ++k) {
        #pragma omp atomic
        values(k, o) += w * in(k, j);
      }
    }
  }
  return values;
}

矩阵的典型维度是:

in =  10 x N
offset, weight :  6 X N
Here N is very large like 20 million.

由于存在竞争条件,我使用了原子操作。然而,这导致代码比没有 openMP #pragmas 的串行代码慢得多(慢 2-3 倍)。

运行 的完整代码可以找到 here

除了使用简单的 use parallel for 之外,我在多线程编程方面没有任何经验。那么我在这里做错了什么?

我也可以使用非 OpenMP 解决方案,例如 TBB。

您有一个主要问题,线程彼此等待。原子操作导致所有其他线程等待,直到当前线程完成操作。在我的计算机上花费的 2.22 秒中,超过 2 秒用于等待其他线程完成。在这个简单的例子中,最简单的方法是给每个线程它自己的矩阵副本来处理并在之后对它们求和。我意识到内存需求急剧增加并且可能不相关,但为了证明这一点,我在您的代码中添加了一个 methodD 并将计时器更改为使用 chrono:

#include <Eigen/Core>
#include <chrono>
#include <random>
#include <iostream>


Eigen::MatrixXf methodB(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf values(in.rows(), 2000);
    for (int j = 0; j < in.cols(); ++j) {

        for (int i = 0; i < offset.rows(); ++i) {
            int o = offset(i, j);
            float w = weight(i, j);

            // Update column o i.e. values.col(o) += w * in.col(j);
            for (int k = 0; k < in.rows(); ++k)
                values(k, o) += w * in(k, j);
        }
    }
    return values;
}

Eigen::MatrixXf methodC(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf values(in.rows(), 2000);

#pragma omp parallel for
    for (int j = 0; j < in.cols(); ++j) {

        for (int i = 0; i < offset.rows(); ++i) {
            int o = offset(i, j);
            float w = weight(i, j);

            // Update column o i.e. values.col(o) += w * in.col(j);
            for (int k = 0; k < in.rows(); ++k)
#pragma omp atomic
                values(k, o) += w * in(k, j);
        }
    }
    return values;
}
Eigen::MatrixXf methodD(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf global_values(in.rows(), 2000);

#pragma omp parallel
    {
        Eigen::MatrixXf values(in.rows(), 2000);
#pragma omp for
        for (int j = 0; j < in.cols(); ++j) {

            for (int i = 0; i < offset.rows(); ++i) {
                int o = offset(i, j);
                float w = weight(i, j);

                // Update column o i.e. values.col(o) += w * in.col(j);
                for (int k = 0; k < in.rows(); ++k)
                    values(k, o) += w * in(k, j);
            }
        }

#pragma omp critical
        {
            global_values += values;
        }
    }
    return global_values;
}

int main(int argc, char const *argv[]) {
    typedef std::chrono::high_resolution_clock TimerType;

    Eigen::initParallel();

    int N = 960 * 720 * 20;

    Eigen::MatrixXf in(11, N);
    in.setRandom();

    Eigen::MatrixXf weight(6, N);
    in.setRandom();

    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(0, 1999);

    Eigen::MatrixXf outB, outC, outD;

    Eigen::MatrixXi offset(6, N);
    for (int i = 0; i < offset.size(); i++)
        offset(i) = dis(gen);


    {
        auto tb = TimerType::now() ;
        outB = methodB(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodB) " << 
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now();
        outC = methodC(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodC) " <<
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now();
        outD = methodD(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodD) " <<
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    std::cout << "sum|B-C| = " << (outB - outC).cwiseAbs().sum() << std::endl;
    std::cout << "sum|B-D| = " << (outB - outD).cwiseAbs().sum() << std::endl;

    return 0;
}

这给了我:

Total Time (methodB) 2006 milliseconds.
Total Time (methodC) 3469 milliseconds.
Total Time (methodD) 366 milliseconds.
sum|B-C| = 0
sum|B-D| = 1.10737e-037

我还没有资格发表评论,但 Avi Ginsburg 的(非常好!)回答包含一些小错误。另外,OP很可能在代码中有错误。

第一个错误:在声明 weight(6, N) 后,矩阵 'in' 被设置为零。这应该是权重。否则,所有输出矩阵都将完全为零。这就是 Avi Ginsburg 的答案有效的原因。

第二个错误:在 Avi Ginsberg 的方法 D 中,矩阵 global_values 和值未初始化为零。这导致未定义的输出。事实上,对我来说,当使用 openmp 时,methodD 给出了错误的结果。以下代码修复了所有这些问题。编译:

g++ filename.cpp -o execname -I/path/to/eigen3 -O2 -fopenmp

#include <Eigen/Core>
#include <chrono>
#include <random>
#include <iostream>

Eigen::MatrixXf methodA(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
  assert(offset.size() == weight.size());
  Eigen::MatrixXf values(in.rows(), 2000);

  #pragma omp parallel for
  for (int j = 0; j < in.cols(); ++j) {

    for (int i = 0; i < offset.rows(); ++i) {
      int o = offset(i, j); // guaranteed to index between 0 - in.cols()
      float w = weight(i, j);

      // Update column o i.e. values.col(o) += w * in.col(j);
      for (int k = 0; k < in.rows(); ++k) {
        #pragma omp atomic
        values(k, o) += w * in(k, j);
      }
    }
  }
  return values;
}

Eigen::MatrixXf methodB(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf values(in.rows(), 2000);
    for (int j = 0; j < in.cols(); ++j) {

        for (int i = 0; i < offset.rows(); ++i) {
            int o = offset(i, j);
            float w = weight(i, j);

            // Update column o i.e. values.col(o) += w * in.col(j);
            for (int k = 0; k < in.rows(); ++k)
                values(k, o) += w * in(k, j);
        }
    }
    return values;
}

Eigen::MatrixXf methodC(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf values(in.rows(), 2000);

#pragma omp parallel for
    for (int j = 0; j < in.cols(); ++j) {

        for (int i = 0; i < offset.rows(); ++i) {
            int o = offset(i, j);
            float w = weight(i, j);

            // Update column o i.e. values.col(o) += w * in.col(j);
            for (int k = 0; k < in.rows(); ++k)
#pragma omp atomic
                values(k, o) += w * in(k, j);
        }
    }
    return values;
}
Eigen::MatrixXf methodD(const Eigen::MatrixXf& in, const Eigen::MatrixXi& offset, const Eigen::MatrixXf& weight) {
    assert(offset.size() == weight.size());
    Eigen::MatrixXf global_values = Eigen::MatrixXf::Zero(in.rows(), 2000);

#pragma omp parallel default(none)
    {
        Eigen::MatrixXf values = Eigen::MatrixXf::Zero(in.rows(), 2000);
#pragma omp for
        for (int j = 0; j < in.cols(); ++j) {

            for (int i = 0; i < offset.rows(); ++i) {
                int o = offset(i, j);
                float w = weight(i, j);

                // Update column o i.e. values.col(o) += w * in.col(j);
                for (int k = 0; k < in.rows(); ++k)
                    values(k, o) += w * in(k, j);
            }
        }

#pragma omp critical
        {
            global_values += values;
        }
    }
    return global_values;
}

int main(int argc, char const *argv[]) {
    typedef std::chrono::high_resolution_clock TimerType;

    Eigen::initParallel();

    int N = 960 * 720 * 20;
    float norm;

    Eigen::MatrixXf in(11, N);
    in.setRandom();

    Eigen::MatrixXf weight(6, N);
    weight.setRandom();

    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(0, 1999);

    Eigen::MatrixXf outA, outB, outC, outD;

    Eigen::MatrixXi offset(6, N);
    for (int i = 0; i < offset.size(); i++)
        offset(i) = dis(gen);

    {
        auto tb = TimerType::now() ;
        outA = methodA(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodA) " << 
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now() ;
        outB = methodB(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodB) " << 
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now();
        outC = methodC(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodC) " <<
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    {
        auto tb = TimerType::now();
        outD = methodD(in, offset, weight);
        auto te = TimerType::now();
        std::cout << "Total Time (methodD) " <<
            std::chrono::duration_cast<std::chrono::milliseconds>(te - tb).count()
            << " milliseconds.\n";
    }

    norm = outB.cwiseAbs().sum();

    std::cout << "sum|A-B| = " << (outA - outB).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|B-C| = " << (outB - outC).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|C-D| = " << (outC - outD).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|A-C| = " << (outA - outC).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|B-D| = " << (outB - outD).cwiseAbs().sum() / norm << std::endl;
    std::cout << "sum|A-D| = " << (outA - outD).cwiseAbs().sum() / norm << std::endl;

    return 0;
}