C++ 并发队列:线程数大于 1 时速度较慢
C++ Concurrent Queue: Slower with > 1 threads
队列并发有两种不同的锁:一种用于enqueue()以保护多个线程同时入队,另一种用于dequeue()以达到相同的效果。
如果队列已满,Add(enqueue) 只会跳过 (returns) 插入。
Remove(dequeue) 如果队列为空则跳过删除。
我使用 doRandom() 生成了一堆 0 到 1 之间的随机数。我使用这些数字来决定是否 add/remove.
性能: 我已经尝试使用 static/dynamic 线程分配测试队列。两种分配算法的 >1 个线程的执行时间都明显变慢。
//g++ -std=c++0x -pthread -o block blocking.cpp;./block
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <random>
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <ctime>
#include <condition_variable>
using namespace std;
#define NUMBER_OF_THREADS 1
#define NUMBER_OF_OPERATIONS 10000000
#define QUEUE_CAPACITY 1000000
std::vector<double> getRandom();
template <class T> class BoundedQueue {
private:
T * array;
int head, tail, capacity;
std::mutex enqLock, deqLock;
std::atomic<long> sharedCounter;
std::condition_variable notEmptyCondition, notFullCondition;
public:
void staticAllocation(double randomNumbers[], int threadID);
void dynamicAllocation(double randomNumbers[]);
void add (T x);
BoundedQueue ();
void remove ();
};
template <class T> BoundedQueue<T>::BoundedQueue () {
capacity = QUEUE_CAPACITY;
array = new T[capacity];
head = 0;
tail = 0;
sharedCounter = 0;
}
template <class T> void BoundedQueue<T>::add (T x) {
enqLock.lock();
if (tail - head == capacity) {
enqLock.unlock();
return;
}
array[tail % capacity] = x;
tail++;
enqLock.unlock();
}
template <class T> void BoundedQueue<T>::remove() {
deqLock.lock();
if (tail - head == 0) {
deqLock.unlock();
return;
}
T result = array [head % capacity];
head++;
deqLock.unlock();
}
template <class T> void BoundedQueue<T>::dynamicAllocation(double randomNumbers[]) {
long i = 0;
while (i < QUEUE_CAPACITY) {
i = sharedCounter.fetch_add(1, std::memory_order_relaxed);
if(randomNumbers[i] <= 0.5) add(0);
else remove();
}
}
template <class T> void BoundedQueue<T>::staticAllocation (double randomNumbers[], int threadID) {
int split = NUMBER_OF_OPERATIONS / NUMBER_OF_THREADS;
for (int i = threadID * split; i < (threadID * split) + split; i++) {
if(randomNumbers[i] <= 0.5) add(0);
else remove();
}
}
std::vector<double> getRandom() {
std::vector<double> numbers;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis(0,1);
for(int i = 0; i < NUMBER_OF_OPERATIONS; i++) numbers.push_back(dis(gen));
return numbers;
}
int main () {
BoundedQueue<int> bQ;
std::vector<double> temp = getRandom();
double* randomNumbers = &temp[0];
std::thread myThreads[NUMBER_OF_THREADS];
clock_t begin = clock();
for(int i = 0; i < NUMBER_OF_THREADS; i++) {
myThreads[i] = std::thread ( [&] {bQ.dynamicAllocation(randomNumbers); });
}
for(int i = 0; i < NUMBER_OF_THREADS; i++) {
if(myThreads[i].joinable()) myThreads[i].join();
}
clock_t end = clock();
cout << double(end-begin) * 1000 / CLOCKS_PER_SEC;
return 0;
}
这里没有并列。请注意,由于您在函数开始时锁定并在结束时释放它,因此您的所有线程几乎都变成了序列化。
尽管由于缺乏并行性,工作在线程之间进行了分配,但 lock/unlock 的开销占主导地位,并且与单线程相比,整体执行时间较长。
所以有并发性但是没有并行性我们只是付出了同步的代价而没有性能优势。
队列并发有两种不同的锁:一种用于enqueue()以保护多个线程同时入队,另一种用于dequeue()以达到相同的效果。
如果队列已满,Add(enqueue) 只会跳过 (returns) 插入。 Remove(dequeue) 如果队列为空则跳过删除。
我使用 doRandom() 生成了一堆 0 到 1 之间的随机数。我使用这些数字来决定是否 add/remove.
性能: 我已经尝试使用 static/dynamic 线程分配测试队列。两种分配算法的 >1 个线程的执行时间都明显变慢。
//g++ -std=c++0x -pthread -o block blocking.cpp;./block
#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <random>
#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <ctime>
#include <condition_variable>
using namespace std;
#define NUMBER_OF_THREADS 1
#define NUMBER_OF_OPERATIONS 10000000
#define QUEUE_CAPACITY 1000000
std::vector<double> getRandom();
template <class T> class BoundedQueue {
private:
T * array;
int head, tail, capacity;
std::mutex enqLock, deqLock;
std::atomic<long> sharedCounter;
std::condition_variable notEmptyCondition, notFullCondition;
public:
void staticAllocation(double randomNumbers[], int threadID);
void dynamicAllocation(double randomNumbers[]);
void add (T x);
BoundedQueue ();
void remove ();
};
template <class T> BoundedQueue<T>::BoundedQueue () {
capacity = QUEUE_CAPACITY;
array = new T[capacity];
head = 0;
tail = 0;
sharedCounter = 0;
}
template <class T> void BoundedQueue<T>::add (T x) {
enqLock.lock();
if (tail - head == capacity) {
enqLock.unlock();
return;
}
array[tail % capacity] = x;
tail++;
enqLock.unlock();
}
template <class T> void BoundedQueue<T>::remove() {
deqLock.lock();
if (tail - head == 0) {
deqLock.unlock();
return;
}
T result = array [head % capacity];
head++;
deqLock.unlock();
}
template <class T> void BoundedQueue<T>::dynamicAllocation(double randomNumbers[]) {
long i = 0;
while (i < QUEUE_CAPACITY) {
i = sharedCounter.fetch_add(1, std::memory_order_relaxed);
if(randomNumbers[i] <= 0.5) add(0);
else remove();
}
}
template <class T> void BoundedQueue<T>::staticAllocation (double randomNumbers[], int threadID) {
int split = NUMBER_OF_OPERATIONS / NUMBER_OF_THREADS;
for (int i = threadID * split; i < (threadID * split) + split; i++) {
if(randomNumbers[i] <= 0.5) add(0);
else remove();
}
}
std::vector<double> getRandom() {
std::vector<double> numbers;
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis(0,1);
for(int i = 0; i < NUMBER_OF_OPERATIONS; i++) numbers.push_back(dis(gen));
return numbers;
}
int main () {
BoundedQueue<int> bQ;
std::vector<double> temp = getRandom();
double* randomNumbers = &temp[0];
std::thread myThreads[NUMBER_OF_THREADS];
clock_t begin = clock();
for(int i = 0; i < NUMBER_OF_THREADS; i++) {
myThreads[i] = std::thread ( [&] {bQ.dynamicAllocation(randomNumbers); });
}
for(int i = 0; i < NUMBER_OF_THREADS; i++) {
if(myThreads[i].joinable()) myThreads[i].join();
}
clock_t end = clock();
cout << double(end-begin) * 1000 / CLOCKS_PER_SEC;
return 0;
}
这里没有并列。请注意,由于您在函数开始时锁定并在结束时释放它,因此您的所有线程几乎都变成了序列化。
尽管由于缺乏并行性,工作在线程之间进行了分配,但 lock/unlock 的开销占主导地位,并且与单线程相比,整体执行时间较长。
所以有并发性但是没有并行性我们只是付出了同步的代价而没有性能优势。