C ++中阻塞队列和障碍的死锁
Deadlock with blocking queue and barrier in C++
我有一个非常简单的小型 C++ 程序,它创建一个 线程池 ,然后将 消息 放入 线程间共享的阻塞队列 告诉每个线程该做什么。
消息可以是:-1(流结束 -> 终止),-2(屏障 -> 等待所有线程到达它,然后继续),其他值 进行随机计算。循环按以下顺序完成:一些计算、屏障、一些计算、屏障、...、屏障、流结束、线程连接、退出。
我无法理解为什么我会出现死锁,即使池中有 2 个线程。队列不能变空,但是我推送和弹出消息的顺序总是会导致空队列!
阻塞队列实现是这里提出的 (C++ Equivalent to Java's BlockingQueue),只添加了两个方法。我也复制下面的队列代码。
有什么帮助吗?
Main.cpp
#include <iostream>
#include <vector>
#include <thread>
#include "Queue.hpp"
using namespace std;
// function executed by each thread
void f(int i, Queue<int> &q){
while(1){
// take a message from blocking queue
int j= q.pop();
// if it is end of stream then exit
if (j==-1) break;
// if it is barrier, wait for other threads to reach it
if (j==-2){
// active wait! BAD, but anyway...
while(q.size() > 0){
;
}
}
else{
// random stuff
int x = 0;
for(int i=0;i<j;i++)
x += 4;
}
}
}
int main(){
Queue<int> queue; //blocking queue
vector<thread> tids; // thread pool
int nt = 2; // number of threads
int dim = 8; // number to control number of operations
// create thread pool, passing thread id and queue
for(int i=0;i<nt;i++)
tids.push_back(thread(f,i, std::ref(queue)));
for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine
// push random number
for(int j=0;j<dist;j++){
queue.push(4);
}
// push barrier code
for(int i=0;i<nt;i++){
queue.push(-2);
}
// active wait! BAD, but anyway...
while (queue.size()>0){
;
}
}
// push end of stream
for(int i=0;i<nt;i++)
queue.push(-1);
// join thread pool
for(int i=0;i<nt;i++){
tids[i].join();
}
return 0;
}
Queue.hpp
#include <deque>
#include <mutex>
#include <condition_variable>
template <typename T>
class Queue
{
private:
std::mutex d_mutex;
std::condition_variable d_condition;
std::deque<T> d_queue;
public:
void push(T const& value) {
{
std::unique_lock<std::mutex> lock(this->d_mutex);
d_queue.push_front(value);
}
this->d_condition.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
return rc;
}
bool empty(){
std::unique_lock<std::mutex> lock(this->d_mutex);
return this->d_queue.empty();
}
int size(){
std::unique_lock<std::mutex> lock(this->d_mutex);
return this->d_queue.size();
}
};
我认为问题在于您描述为 "BAD, but anyway..." 的主动等待并使用队列的大小作为障碍而不是使用真正的 synchronization barrier
对于 dim =1,您推送一个包含 4、-2、-2 的队列。一个线程将获取 4 和 -2,而另一个线程将获取剩余的 -2。此时队列为空,您有三个线程(两个工作线程和主线程)进行主动等待以查看队列是否已清空。大小上有一个互斥量,一次只能读取一个大小。如果主线程首先被调度并确定队列为空,它将推送 -1,-1 以表示流结束。现在,队列不再为空,而是两个工作线程中的一个或两个都在等待它变空。由于他们在拿走另一个项目之前等待它为空,因此队列在这种状态下处于死锁状态。
对于 dim > 1 的情况,在两个工作确认清空队列并退出活动等待之前,将下一组值推入主线程的队列可能存在类似的问题。
我有 运行 你的代码,我明白这个问题。问题在于“-2”选项。当两个线程到达这一点时,您的主线程已经将另一个值推送到队列中。因此,如果您的队列在您的线程获得“-2”值和它们到达“-2”选项之前之间增加了它的大小,您的代码将卡住:
线程 1:得到 -2。
线程 2:得到 -2。
主线程:push -1。
主线程:push -1。
线程 1:等到整个队列为空。
线程 2:等到整个队列都为空。
队列:
-1
-1
^ 如果 dim 等于 1。在您的代码中,dim 等于 8,您不想看到它的样子..
为了解决这个问题,我所做的就是禁用以下循环:
for(int i=0;i<nt;i++){
queue.push(-2);
}
当此pard disable时,代码运行完美。
我是这样检查的:
std::mutex guarder;
// function executed by each thread
void f(int i, Queue<int> &q){
while(1){
// take a message from blocking queue
guarder.lock();
int j= q.pop();
guarder.unlock();
// if it is end of stream then exit
if (j==-1) break;
// if it is barrier, wait for other threads to reach it
if (j==-2){
// active wait! BAD, but anyway...
while(q.size() > 0){
;
}
}
else{
// random stuff
int x = 0;
for(int i=0;i<j;i++)
x += 4;
guarder.lock();
cout << x << std::endl;
guarder.unlock();
}
}
}
int main(){
Queue<int> queue; //blocking queue
vector<thread> tids; // thread pool
int nt = 2; // number of threads
int dim = 8; // number to control number of operations
// create thread pool, passing thread id and queue
for(int i=0;i<nt;i++)
tids.push_back(thread(f,i, std::ref(queue)));
for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine
// push random number
for(int j=0;j<dist;j++){
queue.push(dist);
}
/*// push barrier code
for(int i=0;i<nt;i++){
queue.push(-2);
}*/
// active wait! BAD, but anyway...
while (queue.size()>0){
;
}
}
// push end of stream
for(int i=0;i<nt;i++)
queue.push(-1);
// join thread pool
for(int i=0;i<nt;i++){
tids[i].join();
}
return 0;
}
结果:
4
8
8
12
12
12
16
16
16
20
20
16
20
20
20
24
24
24
24
24
24
28
28
28
28
28
28
28
32
32
32
32
32
32
32
32
顺便说一句,卡住并没有发生,因为你的 "active wait" 部分。这不好,但通常会导致其他问题(比如减慢系统速度)。
我有一个非常简单的小型 C++ 程序,它创建一个 线程池 ,然后将 消息 放入 线程间共享的阻塞队列 告诉每个线程该做什么。
消息可以是:-1(流结束 -> 终止),-2(屏障 -> 等待所有线程到达它,然后继续),其他值 进行随机计算。循环按以下顺序完成:一些计算、屏障、一些计算、屏障、...、屏障、流结束、线程连接、退出。
我无法理解为什么我会出现死锁,即使池中有 2 个线程。队列不能变空,但是我推送和弹出消息的顺序总是会导致空队列!
阻塞队列实现是这里提出的 (C++ Equivalent to Java's BlockingQueue),只添加了两个方法。我也复制下面的队列代码。
有什么帮助吗?
Main.cpp
#include <iostream>
#include <vector>
#include <thread>
#include "Queue.hpp"
using namespace std;
// function executed by each thread
void f(int i, Queue<int> &q){
while(1){
// take a message from blocking queue
int j= q.pop();
// if it is end of stream then exit
if (j==-1) break;
// if it is barrier, wait for other threads to reach it
if (j==-2){
// active wait! BAD, but anyway...
while(q.size() > 0){
;
}
}
else{
// random stuff
int x = 0;
for(int i=0;i<j;i++)
x += 4;
}
}
}
int main(){
Queue<int> queue; //blocking queue
vector<thread> tids; // thread pool
int nt = 2; // number of threads
int dim = 8; // number to control number of operations
// create thread pool, passing thread id and queue
for(int i=0;i<nt;i++)
tids.push_back(thread(f,i, std::ref(queue)));
for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine
// push random number
for(int j=0;j<dist;j++){
queue.push(4);
}
// push barrier code
for(int i=0;i<nt;i++){
queue.push(-2);
}
// active wait! BAD, but anyway...
while (queue.size()>0){
;
}
}
// push end of stream
for(int i=0;i<nt;i++)
queue.push(-1);
// join thread pool
for(int i=0;i<nt;i++){
tids[i].join();
}
return 0;
}
Queue.hpp
#include <deque>
#include <mutex>
#include <condition_variable>
template <typename T>
class Queue
{
private:
std::mutex d_mutex;
std::condition_variable d_condition;
std::deque<T> d_queue;
public:
void push(T const& value) {
{
std::unique_lock<std::mutex> lock(this->d_mutex);
d_queue.push_front(value);
}
this->d_condition.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
return rc;
}
bool empty(){
std::unique_lock<std::mutex> lock(this->d_mutex);
return this->d_queue.empty();
}
int size(){
std::unique_lock<std::mutex> lock(this->d_mutex);
return this->d_queue.size();
}
};
我认为问题在于您描述为 "BAD, but anyway..." 的主动等待并使用队列的大小作为障碍而不是使用真正的 synchronization barrier
对于 dim =1,您推送一个包含 4、-2、-2 的队列。一个线程将获取 4 和 -2,而另一个线程将获取剩余的 -2。此时队列为空,您有三个线程(两个工作线程和主线程)进行主动等待以查看队列是否已清空。大小上有一个互斥量,一次只能读取一个大小。如果主线程首先被调度并确定队列为空,它将推送 -1,-1 以表示流结束。现在,队列不再为空,而是两个工作线程中的一个或两个都在等待它变空。由于他们在拿走另一个项目之前等待它为空,因此队列在这种状态下处于死锁状态。
对于 dim > 1 的情况,在两个工作确认清空队列并退出活动等待之前,将下一组值推入主线程的队列可能存在类似的问题。
我有 运行 你的代码,我明白这个问题。问题在于“-2”选项。当两个线程到达这一点时,您的主线程已经将另一个值推送到队列中。因此,如果您的队列在您的线程获得“-2”值和它们到达“-2”选项之前之间增加了它的大小,您的代码将卡住: 线程 1:得到 -2。 线程 2:得到 -2。 主线程:push -1。 主线程:push -1。 线程 1:等到整个队列为空。 线程 2:等到整个队列都为空。
队列: -1 -1
^ 如果 dim 等于 1。在您的代码中,dim 等于 8,您不想看到它的样子.. 为了解决这个问题,我所做的就是禁用以下循环:
for(int i=0;i<nt;i++){
queue.push(-2);
}
当此pard disable时,代码运行完美。 我是这样检查的:
std::mutex guarder;
// function executed by each thread
void f(int i, Queue<int> &q){
while(1){
// take a message from blocking queue
guarder.lock();
int j= q.pop();
guarder.unlock();
// if it is end of stream then exit
if (j==-1) break;
// if it is barrier, wait for other threads to reach it
if (j==-2){
// active wait! BAD, but anyway...
while(q.size() > 0){
;
}
}
else{
// random stuff
int x = 0;
for(int i=0;i<j;i++)
x += 4;
guarder.lock();
cout << x << std::endl;
guarder.unlock();
}
}
}
int main(){
Queue<int> queue; //blocking queue
vector<thread> tids; // thread pool
int nt = 2; // number of threads
int dim = 8; // number to control number of operations
// create thread pool, passing thread id and queue
for(int i=0;i<nt;i++)
tids.push_back(thread(f,i, std::ref(queue)));
for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine
// push random number
for(int j=0;j<dist;j++){
queue.push(dist);
}
/*// push barrier code
for(int i=0;i<nt;i++){
queue.push(-2);
}*/
// active wait! BAD, but anyway...
while (queue.size()>0){
;
}
}
// push end of stream
for(int i=0;i<nt;i++)
queue.push(-1);
// join thread pool
for(int i=0;i<nt;i++){
tids[i].join();
}
return 0;
}
结果:
4
8
8
12
12
12
16
16
16
20
20
16
20
20
20
24
24
24
24
24
24
28
28
28
28
28
28
28
32
32
32
32
32
32
32
32
顺便说一句,卡住并没有发生,因为你的 "active wait" 部分。这不好,但通常会导致其他问题(比如减慢系统速度)。