C++ 中的矢量和计算 - 并行代码比串行代码慢
Vector sum calculation in C++ - Parallel code slower than serial
我正在尝试编写执行向量元素求和的多线程代码。
代码很简单:
- 线程是通过线程向量定义的;
- 线程数由 ThreadsSize 变量定义;
- 使用 ThreadsSize 等于 1,求和大约在 300ms 内完成,而使用 8 个线程在 1200ms 内。
我使用的是 8 核硬件。
有人可以解释为什么会这样吗?从理论上讲,如果使用 8 个线程,我希望有 300/8 毫秒。不对吗?
这里是代码:
#include <iostream>
#include "math.h"
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;
/* Parallel sum function */
void Function_Sum(mutex& Mutex, vector<double>& Vector, int unsigned kin, int unsigned kend, double& Sum)
{
for(int unsigned k =kin; k <= kend; k = k + 1)
{
Mutex.lock();
Sum = Sum + Vector[k];
Mutex.unlock();
}
}
/* Main function */
int main(int argc, char *argv[])
{
// Threads and mutex initialization
int unsigned ThreadsSize = 1;
vector<thread> Threads;
mutex Mutex;
// Vector definition
vector<double> Vector(10000000,1);
// Indexes initialization
int unsigned kin, kend;
int unsigned dk = floor(Vector.size() / ThreadsSize);
// Outout 1
cout << "VectorSize = " << Vector.size() << ", ThreadsSize = " << ThreadsSize << ", dk = " << dk << endl;
// Parallel sum
auto t_start = std::chrono::high_resolution_clock::now();
double Sum = 0;
for(int unsigned k = 0; k <= ThreadsSize - 1; k = k + 1)
{
kin = k * dk;
kend = (k + 1) * dk - 1;
if(k == ThreadsSize - 1)
{
kend = Vector.size() - 1;
}
cout << k << " in: " << kin << ", end: " << kend << endl;
Threads.push_back(thread(Function_Sum, ref(Mutex), ref(Vector), kin, kend, ref(Sum)));
}
// Threads joining
for(int unsigned k = 0; k <= ThreadsSize - 1 ; k = k + 1)
{
Threads[k].join();
}
// Elapsed time calculation
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms = std::chrono::duration<double, std::milli>(t_end-t_start).count();
// Output 2
cout << "Sum = " << Sum << endl;
cout << "Time = " << elapsed_time_ms << endl;
}
提前致谢。
Can someone explain why this happens? Theoretically I would expect to have 300/8 ms in case 8 threads are used. Is it not correct?
理论上,您可以获得接近 300/8
的结果(加上线程的开销)
但是您使用互斥锁的方式完全阻止了任何并行化。
Mutex.lock();
Sum = Sum + Vector[k];
Mutex.unlock();
你在这里做的是:
- 请求锁定,以便任何其他线程必须等待直到互斥体再次解锁
- 做
Sum = Sum + Vector[k];
- 释放锁以便另一个线程可以获得锁
所以 Sum = Sum + Vector[k];
的 none 是并行完成的,你现在有原来的 300 毫秒,另外还有互斥处理的开销。
你想要做的是将你的数组分成 8 个部分,使用每个线程自己的存储并行地总结这些分区,然后总结 8 个线程的结果。
对于这种分而治之的方法,您实际上甚至不需要互斥量。
您只需要一个大小为 8
的容器,您可以在其中存储每个线程的结果。
在“加入”循环后,您可以遍历该容器并对各个总和求和。
或者,您可以使用 std::future
因此您的代码将如下所示:
#include "math.h"
#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
/* Parallel sum function */
double Function_Sum(vector<double> &Vector, int unsigned kin,
int unsigned kend) {
double Sum = 0;
for (int unsigned k = kin; k <= kend; k = k + 1) {
Sum = Sum + Vector[k];
}
return Sum;
}
/* Main function */
int main(int argc, char *argv[]) {
// Threads and mutex initialization
int unsigned ThreadsSize = 8;
vector<std::future<double>> Futures;
mutex Mutex;
// Vector definition
vector<double> Vector(10000000, 1);
// Indexes initialization
int unsigned kin, kend;
int unsigned dk = floor(Vector.size() / ThreadsSize);
// Outout 1
cout << "VectorSize = " << Vector.size() << ", ThreadsSize = " << ThreadsSize
<< ", dk = " << dk << endl;
// Parallel sum
auto t_start = std::chrono::high_resolution_clock::now();
double Sum = 0;
for (int unsigned k = 0; k <= ThreadsSize - 1; k = k + 1) {
kin = k * dk;
kend = (k + 1) * dk - 1;
if (k == ThreadsSize - 1) {
kend = Vector.size() - 1;
}
cout << k << " in: " << kin << ", end: " << kend << endl;
Futures.push_back(std::async(std::launch::async, [&Vector, kin, kend]() {
return Function_Sum(Vector, kin, kend);
}));
}
// Threads joining
for (int unsigned k = 0; k <= ThreadsSize - 1; k = k + 1) {
Sum += Futures[k].get();
}
// Elapsed time calculation
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms =
std::chrono::duration<double, std::milli>(t_end - t_start).count();
// Output 2
cout << "Sum = " << Sum << endl;
cout << "Time = " << elapsed_time_ms << endl;
return 0;
}
除此之外,您通常希望利用其他标准功能,例如 std::accumulate
、基于范围的循环和迭代器。而且你不应该使用 using namespace std
.
有了它,您的代码可能如下所示:
#include <chrono>
#include <cmath>
#include <future>
#include <iostream>
#include <numeric>
#include <thread>
#include <vector>
int main() {
unsigned int ThreadsSize = 8;
std::vector<std::future<double>> Futures;
// Vector definition
std::vector<double> Vector(10000000,1);
// Indexes initialization
unsigned int dk = std::floor(Vector.size() / ThreadsSize);
// Output 1
std::cout << "VectorSize = " << Vector.size()
<< ", ThreadsSize = " << ThreadsSize << ", dk = " << dk
<< std::endl;
// Parallel sum
auto t_start = std::chrono::high_resolution_clock::now();
auto currentIterator = Vector.begin();
for (unsigned int k = 0; k < ThreadsSize; k++) {
// save a copy of the current iterator
auto endIterator = currentIterator;
if (k == ThreadsSize - 1) {
// use the actual end iterator for the last thread
endIterator = Vector.end();
} else {
// advance the end iterator
std::advance(endIterator, dk);
}
// create an async task that returns a future
Futures.push_back(
std::async(std::launch::async, [currentIterator, endIterator]() {
// create the sum over the partition
return std::accumulate(currentIterator, endIterator, 0.0);
}));
currentIterator = endIterator;
}
// collect the results of the futures
double Sum = 0;
for (auto &future : Futures) {
Sum += future.get();
}
// Elapsed time calculation
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms =
std::chrono::duration<double, std::milli>(t_end - t_start).count();
// Output 2
std::cout << "Sum = " << Sum << std::endl;
std::cout << "Time = " << elapsed_time_ms << std::endl;
return 0;
}
Function_Sum 的这个小修改可以获得您想要的加速:
double sum = 0.;
for(int unsigned k =kin; k <= kend; k = k + 1)
sum += Vector[k];
Mutex.lock();
Sum += sum;
Mutex.unlock();
Mutex 现在每个线程锁定一次,而不是每次添加锁定一次。如果你想要一个简单的解释,那只是因为锁定和解锁互斥体的成本远高于加法。
我正在尝试编写执行向量元素求和的多线程代码。
代码很简单:
- 线程是通过线程向量定义的;
- 线程数由 ThreadsSize 变量定义;
- 使用 ThreadsSize 等于 1,求和大约在 300ms 内完成,而使用 8 个线程在 1200ms 内。
我使用的是 8 核硬件。
有人可以解释为什么会这样吗?从理论上讲,如果使用 8 个线程,我希望有 300/8 毫秒。不对吗?
这里是代码:
#include <iostream>
#include "math.h"
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>
using namespace std;
/* Parallel sum function */
void Function_Sum(mutex& Mutex, vector<double>& Vector, int unsigned kin, int unsigned kend, double& Sum)
{
for(int unsigned k =kin; k <= kend; k = k + 1)
{
Mutex.lock();
Sum = Sum + Vector[k];
Mutex.unlock();
}
}
/* Main function */
int main(int argc, char *argv[])
{
// Threads and mutex initialization
int unsigned ThreadsSize = 1;
vector<thread> Threads;
mutex Mutex;
// Vector definition
vector<double> Vector(10000000,1);
// Indexes initialization
int unsigned kin, kend;
int unsigned dk = floor(Vector.size() / ThreadsSize);
// Outout 1
cout << "VectorSize = " << Vector.size() << ", ThreadsSize = " << ThreadsSize << ", dk = " << dk << endl;
// Parallel sum
auto t_start = std::chrono::high_resolution_clock::now();
double Sum = 0;
for(int unsigned k = 0; k <= ThreadsSize - 1; k = k + 1)
{
kin = k * dk;
kend = (k + 1) * dk - 1;
if(k == ThreadsSize - 1)
{
kend = Vector.size() - 1;
}
cout << k << " in: " << kin << ", end: " << kend << endl;
Threads.push_back(thread(Function_Sum, ref(Mutex), ref(Vector), kin, kend, ref(Sum)));
}
// Threads joining
for(int unsigned k = 0; k <= ThreadsSize - 1 ; k = k + 1)
{
Threads[k].join();
}
// Elapsed time calculation
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms = std::chrono::duration<double, std::milli>(t_end-t_start).count();
// Output 2
cout << "Sum = " << Sum << endl;
cout << "Time = " << elapsed_time_ms << endl;
}
提前致谢。
Can someone explain why this happens? Theoretically I would expect to have 300/8 ms in case 8 threads are used. Is it not correct?
理论上,您可以获得接近 300/8
的结果(加上线程的开销)
但是您使用互斥锁的方式完全阻止了任何并行化。
Mutex.lock();
Sum = Sum + Vector[k];
Mutex.unlock();
你在这里做的是:
- 请求锁定,以便任何其他线程必须等待直到互斥体再次解锁
- 做
Sum = Sum + Vector[k];
- 释放锁以便另一个线程可以获得锁
所以 Sum = Sum + Vector[k];
的 none 是并行完成的,你现在有原来的 300 毫秒,另外还有互斥处理的开销。
你想要做的是将你的数组分成 8 个部分,使用每个线程自己的存储并行地总结这些分区,然后总结 8 个线程的结果。
对于这种分而治之的方法,您实际上甚至不需要互斥量。
您只需要一个大小为 8
的容器,您可以在其中存储每个线程的结果。
在“加入”循环后,您可以遍历该容器并对各个总和求和。
或者,您可以使用 std::future
因此您的代码将如下所示:
#include "math.h"
#include <chrono>
#include <future>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
/* Parallel sum function */
double Function_Sum(vector<double> &Vector, int unsigned kin,
int unsigned kend) {
double Sum = 0;
for (int unsigned k = kin; k <= kend; k = k + 1) {
Sum = Sum + Vector[k];
}
return Sum;
}
/* Main function */
int main(int argc, char *argv[]) {
// Threads and mutex initialization
int unsigned ThreadsSize = 8;
vector<std::future<double>> Futures;
mutex Mutex;
// Vector definition
vector<double> Vector(10000000, 1);
// Indexes initialization
int unsigned kin, kend;
int unsigned dk = floor(Vector.size() / ThreadsSize);
// Outout 1
cout << "VectorSize = " << Vector.size() << ", ThreadsSize = " << ThreadsSize
<< ", dk = " << dk << endl;
// Parallel sum
auto t_start = std::chrono::high_resolution_clock::now();
double Sum = 0;
for (int unsigned k = 0; k <= ThreadsSize - 1; k = k + 1) {
kin = k * dk;
kend = (k + 1) * dk - 1;
if (k == ThreadsSize - 1) {
kend = Vector.size() - 1;
}
cout << k << " in: " << kin << ", end: " << kend << endl;
Futures.push_back(std::async(std::launch::async, [&Vector, kin, kend]() {
return Function_Sum(Vector, kin, kend);
}));
}
// Threads joining
for (int unsigned k = 0; k <= ThreadsSize - 1; k = k + 1) {
Sum += Futures[k].get();
}
// Elapsed time calculation
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms =
std::chrono::duration<double, std::milli>(t_end - t_start).count();
// Output 2
cout << "Sum = " << Sum << endl;
cout << "Time = " << elapsed_time_ms << endl;
return 0;
}
除此之外,您通常希望利用其他标准功能,例如 std::accumulate
、基于范围的循环和迭代器。而且你不应该使用 using namespace std
.
有了它,您的代码可能如下所示:
#include <chrono>
#include <cmath>
#include <future>
#include <iostream>
#include <numeric>
#include <thread>
#include <vector>
int main() {
unsigned int ThreadsSize = 8;
std::vector<std::future<double>> Futures;
// Vector definition
std::vector<double> Vector(10000000,1);
// Indexes initialization
unsigned int dk = std::floor(Vector.size() / ThreadsSize);
// Output 1
std::cout << "VectorSize = " << Vector.size()
<< ", ThreadsSize = " << ThreadsSize << ", dk = " << dk
<< std::endl;
// Parallel sum
auto t_start = std::chrono::high_resolution_clock::now();
auto currentIterator = Vector.begin();
for (unsigned int k = 0; k < ThreadsSize; k++) {
// save a copy of the current iterator
auto endIterator = currentIterator;
if (k == ThreadsSize - 1) {
// use the actual end iterator for the last thread
endIterator = Vector.end();
} else {
// advance the end iterator
std::advance(endIterator, dk);
}
// create an async task that returns a future
Futures.push_back(
std::async(std::launch::async, [currentIterator, endIterator]() {
// create the sum over the partition
return std::accumulate(currentIterator, endIterator, 0.0);
}));
currentIterator = endIterator;
}
// collect the results of the futures
double Sum = 0;
for (auto &future : Futures) {
Sum += future.get();
}
// Elapsed time calculation
auto t_end = std::chrono::high_resolution_clock::now();
double elapsed_time_ms =
std::chrono::duration<double, std::milli>(t_end - t_start).count();
// Output 2
std::cout << "Sum = " << Sum << std::endl;
std::cout << "Time = " << elapsed_time_ms << std::endl;
return 0;
}
Function_Sum 的这个小修改可以获得您想要的加速:
double sum = 0.;
for(int unsigned k =kin; k <= kend; k = k + 1)
sum += Vector[k];
Mutex.lock();
Sum += sum;
Mutex.unlock();
Mutex 现在每个线程锁定一次,而不是每次添加锁定一次。如果你想要一个简单的解释,那只是因为锁定和解锁互斥体的成本远高于加法。