C++ 中的矢量和计算 - 并行代码比串行代码慢

Vector sum calculation in C++ - Parallel code slower than serial

我正在尝试编写执行向量元素求和的多线程代码。

代码很简单:

我使用的是 8 核硬件。

有人可以解释为什么会这样吗?从理论上讲,如果使用 8 个线程,我希望有 300/8 毫秒。不对吗?

这里是代码:

#include <iostream>
#include "math.h"
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>

using namespace std;

/* Parallel sum function */
void Function_Sum(mutex& Mutex, vector<double>& Vector, int unsigned kin, int unsigned kend, double& Sum)
{
    for(int unsigned k =kin; k <= kend; k = k + 1)
    {
        Mutex.lock();
        Sum = Sum + Vector[k];
        Mutex.unlock();
    }
}

/* Main function */
int main(int argc, char *argv[])
{
    // Threads and mutex initialization
    int unsigned ThreadsSize = 1;
    vector<thread> Threads;
    mutex Mutex;
    
    // Vector definition
    vector<double> Vector(10000000,1);
    
    // Indexes initialization
    int unsigned kin, kend;
    int unsigned dk = floor(Vector.size() / ThreadsSize);
    
    // Outout 1
    cout << "VectorSize = " << Vector.size() << ", ThreadsSize = " << ThreadsSize << ", dk = " << dk << endl;
    
    // Parallel sum
    auto t_start = std::chrono::high_resolution_clock::now();
    double Sum = 0;
    for(int unsigned k = 0; k <= ThreadsSize - 1; k = k + 1)
    {
        kin = k * dk;
        kend = (k + 1) * dk - 1;
        if(k == ThreadsSize - 1)
        {
            kend = Vector.size() - 1;
        }
        cout << k << " in: " << kin << ", end: " << kend << endl;
        
        Threads.push_back(thread(Function_Sum, ref(Mutex), ref(Vector), kin, kend, ref(Sum)));  
    }
    
    // Threads joining
    for(int unsigned k = 0; k <= ThreadsSize - 1 ; k = k + 1)
    {
        Threads[k].join();
    }
    
    // Elapsed time calculation
    auto t_end = std::chrono::high_resolution_clock::now();
    double elapsed_time_ms = std::chrono::duration<double, std::milli>(t_end-t_start).count();
    
    // Output 2
    cout << "Sum = " << Sum << endl;
    cout << "Time = " << elapsed_time_ms << endl;
}

提前致谢。

Can someone explain why this happens? Theoretically I would expect to have 300/8 ms in case 8 threads are used. Is it not correct?

理论上,您可以获得接近 300/8 的结果(加上线程的开销)

但是您使用互斥锁的方式完全阻止了任何并行化。

Mutex.lock();
Sum = Sum + Vector[k];
Mutex.unlock();

你在这里做的是:

  1. 请求锁定,以便任何其他线程必须等待直到互斥体再次解锁
  2. Sum = Sum + Vector[k];
  3. 释放锁以便另一个线程可以获得锁

所以 Sum = Sum + Vector[k]; 的 none 是并行完成的,你现在有原来的 300 毫秒,另外还有互斥处理的开销。

你想要做的是将你的数组分成 8 个部分,使用每个线程自己的存储并行地总结这些分区,然后总结 8 个线程的结果。

对于这种分而治之的方法,您实际上甚至不需要互斥量。

您只需要一个大小为 8 的容器,您可以在其中存储每个线程的结果。

在“加入”循环后,您可以遍历该容器并对各个总和求和。

或者,您可以使用 std::future

因此您的代码将如下所示:

#include "math.h"
#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;

/* Parallel sum function */
double Function_Sum(vector<double> &Vector, int unsigned kin,
                    int unsigned kend) {
  double Sum = 0;
  for (int unsigned k = kin; k <= kend; k = k + 1) {
    Sum = Sum + Vector[k];
  }
  return Sum;
}

/* Main function */
int main(int argc, char *argv[]) {
  // Threads and mutex initialization
  int unsigned ThreadsSize = 8;
  vector<std::future<double>> Futures;
  mutex Mutex;

  // Vector definition
  vector<double> Vector(10000000, 1);

  // Indexes initialization
  int unsigned kin, kend;
  int unsigned dk = floor(Vector.size() / ThreadsSize);

  // Outout 1
  cout << "VectorSize = " << Vector.size() << ", ThreadsSize = " << ThreadsSize
       << ", dk = " << dk << endl;

  // Parallel sum
  auto t_start = std::chrono::high_resolution_clock::now();
  double Sum = 0;
  for (int unsigned k = 0; k <= ThreadsSize - 1; k = k + 1) {
    kin = k * dk;
    kend = (k + 1) * dk - 1;
    if (k == ThreadsSize - 1) {
      kend = Vector.size() - 1;
    }
    cout << k << " in: " << kin << ", end: " << kend << endl;

    Futures.push_back(std::async(std::launch::async, [&Vector, kin, kend]() {
      return Function_Sum(Vector, kin, kend);
    }));
  }

  // Threads joining

  for (int unsigned k = 0; k <= ThreadsSize - 1; k = k + 1) {
    Sum += Futures[k].get();
  }

  // Elapsed time calculation
  auto t_end = std::chrono::high_resolution_clock::now();
  double elapsed_time_ms =
      std::chrono::duration<double, std::milli>(t_end - t_start).count();

  // Output 2
  cout << "Sum = " << Sum << endl;
  cout << "Time = " << elapsed_time_ms << endl;

  return 0;
}

除此之外,您通常希望利用其他标准功能,例如 std::accumulate、基于范围的循环和迭代器。而且你不应该使用 using namespace std.

有了它,您的代码可能如下所示:

#include <chrono>
#include <cmath>
#include <future>
#include <iostream>
#include <numeric>
#include <thread>
#include <vector>

int main() {
  unsigned int ThreadsSize = 8;
  std::vector<std::future<double>> Futures;

  // Vector definition
  std::vector<double> Vector(10000000,1);

  // Indexes initialization
  unsigned int dk = std::floor(Vector.size() / ThreadsSize);

  // Output 1
  std::cout << "VectorSize = " << Vector.size()
            << ", ThreadsSize = " << ThreadsSize << ", dk = " << dk
            << std::endl;

  // Parallel sum
  auto t_start = std::chrono::high_resolution_clock::now();

  auto currentIterator = Vector.begin();
  for (unsigned int k = 0; k < ThreadsSize; k++) {
    // save a copy of the current iterator
    auto endIterator = currentIterator;

    if (k == ThreadsSize - 1) {
      // use the actual end iterator for the last thread
      endIterator = Vector.end();
    } else {
      // advance the end iterator
      std::advance(endIterator, dk);
    }

    // create an async task that returns a future
    Futures.push_back(
        std::async(std::launch::async, [currentIterator, endIterator]() {
          // create the sum over the partition
          return std::accumulate(currentIterator, endIterator, 0.0);
        }));

    currentIterator = endIterator;
  }

  // collect the results of the futures
  double Sum = 0;
  for (auto &future : Futures) {
    Sum += future.get();
  }

  // Elapsed time calculation
  auto t_end = std::chrono::high_resolution_clock::now();
  double elapsed_time_ms =
      std::chrono::duration<double, std::milli>(t_end - t_start).count();

  // Output 2
  std::cout << "Sum = " << Sum << std::endl;
  std::cout << "Time = " << elapsed_time_ms << std::endl;

  return 0;
}

Function_Sum 的这个小修改可以获得您想要的加速:

double sum = 0.;
for(int unsigned k =kin; k <= kend; k = k + 1)
    sum += Vector[k];
Mutex.lock();
Sum += sum;
Mutex.unlock();

Mutex 现在每个线程锁定一次,而不是每次添加锁定一次。如果你想要一个简单的解释,那只是因为锁定和解锁互斥体的成本远高于加法。