C ++中阻塞队列和障碍的死锁

Deadlock with blocking queue and barrier in C++

我有一个非常简单的小型 C++ 程序,它创建一个 线程池 ,然后将 消息 放入 线程间共享的阻塞队列 告诉每个线程该做什么。

消息可以是:-1(流结束 -> 终止),-2(屏障 -> 等待所有线程到达它,然后继续),其他值 进行随机计算。循环按以下顺序完成:一些计算、屏障、一些计算、屏障、...、屏障、流结束、线程连接、退出。

我无法理解为什么我会出现死锁,即使池中有 2 个线程。队列不能变空,但是我推送和弹出消息的顺序总是会导致空队列!

阻塞队列实现是这里提出的 (C++ Equivalent to Java's BlockingQueue),只添加了两个方法。我也复制下面的队列代码。

有什么帮助吗?

Main.cpp

#include <iostream>
#include <vector>
#include <thread>
#include "Queue.hpp"

using namespace std;

// function executed by each thread
void f(int i, Queue<int> &q){
    while(1){
        // take a message from blocking queue
        int j= q.pop();
        // if it is end of stream then exit
        if (j==-1) break;
        // if it is barrier, wait for other threads to reach it
        if (j==-2){
            // active wait! BAD, but anyway...
            while(q.size() > 0){
                ;
            }
        }
        else{
            // random stuff
            int x = 0;
            for(int i=0;i<j;i++)
                x += 4;
        }
    }
}

int main(){
    Queue<int> queue; //blocking queue
    vector<thread> tids; // thread pool
    int nt = 2; // number of threads
    int dim = 8; // number to control number of operations

    // create thread pool, passing thread id and queue
    for(int i=0;i<nt;i++)
        tids.push_back(thread(f,i, std::ref(queue)));

    for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine

        // push random number
        for(int j=0;j<dist;j++){    
            queue.push(4);  
        }

        // push barrier code
        for(int i=0;i<nt;i++){
            queue.push(-2);
        }

        // active wait! BAD, but anyway...
        while (queue.size()>0){
                 ;
        }
    }
    // push end of stream
    for(int i=0;i<nt;i++)
        queue.push(-1);
    // join thread pool
    for(int i=0;i<nt;i++){
        tids[i].join();
    }           
return 0;
}

Queue.hpp

#include <deque>
#include <mutex>
#include <condition_variable>

template <typename T>
class Queue
{
private:
  std::mutex              d_mutex;
  std::condition_variable d_condition;
  std::deque<T>           d_queue;
public:

  void push(T const& value) {
    {
      std::unique_lock<std::mutex> lock(this->d_mutex);
      d_queue.push_front(value);
    }
    this->d_condition.notify_one();
  }

  T pop() {
    std::unique_lock<std::mutex> lock(this->d_mutex);
    this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
    T rc(std::move(this->d_queue.back()));
    this->d_queue.pop_back();
    return rc;
  }

  bool empty(){
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      return this->d_queue.empty(); 
  }

  int size(){
    std::unique_lock<std::mutex> lock(this->d_mutex); 
    return this->d_queue.size();
  }
};

我认为问题在于您描述为 "BAD, but anyway..." 的主动等待并使用队列的大小作为障碍而不是使用真正的 synchronization barrier

对于 dim =1,您推送一个包含 4、-2、-2 的队列。一个线程将获取 4 和 -2,而另一个线程将获取剩余的 -2。此时队列为空,您有三个线程(两个工作线程和主线程)进行主动等待以查看队列是否已清空。大小上有一个互斥量,一次只能读取一个大小。如果主线程首先被调度并确定队列为空,它将推送 -1,-1 以表示流结束。现在,队列不再为空,而是两个工作线程中的一个或两个都在等待它变空。由于他们在拿走另一个项目之前等待它为空,因此队列在这种状态下处于死锁状态。

对于 dim > 1 的情况,在两个工作确认清空队列并退出活动等待之前,将下一组值推入主线程的队列可能存在类似的问题。

我有 运行 你的代码,我明白这个问题。问题在于“-2”选项。当两个线程到达这一点时,您的主线程已经将另一个值推送到队列中。因此,如果您的队列在您的线程获得“-2”值和它们到达“-2”选项之前之间增加了它的大小,您的代码将卡住: 线程 1:得到 -2。 线程 2:得到 -2。 主线程:push -1。 主线程:push -1。 线程 1:等到整个队列为空。 线程 2:等到整个队列都为空。

队列: -1 -1

^ 如果 dim 等于 1。在您的代码中,dim 等于 8,您不想看到它的样子.. 为了解决这个问题,我所做的就是禁用以下循环:

for(int i=0;i<nt;i++){
    queue.push(-2);
}

当此pard disable时,代码运行完美。 我是这样检查的:

std::mutex guarder;

// function executed by each thread
void f(int i, Queue<int> &q){
    while(1){
        // take a message from blocking queue
        guarder.lock();
        int j= q.pop();
        guarder.unlock();
        // if it is end of stream then exit
        if (j==-1) break;
        // if it is barrier, wait for other threads to reach it
        if (j==-2){
            // active wait! BAD, but anyway...
            while(q.size() > 0){
                ;
            }
        }
        else{
            // random stuff
            int x = 0;
            for(int i=0;i<j;i++)
                x += 4;
            guarder.lock();
            cout << x << std::endl;
            guarder.unlock();
        }
    }
}

int main(){
    Queue<int> queue; //blocking queue
    vector<thread> tids; // thread pool
    int nt = 2; // number of threads
    int dim = 8; // number to control number of operations

    // create thread pool, passing thread id and queue
    for(int i=0;i<nt;i++)
        tids.push_back(thread(f,i, std::ref(queue)));

    for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine

        // push random number
        for(int j=0;j<dist;j++){
            queue.push(dist);
        }

        /*// push barrier code
        for(int i=0;i<nt;i++){
            queue.push(-2);
        }*/

        // active wait! BAD, but anyway...
        while (queue.size()>0){
            ;
        }
    }
    // push end of stream
    for(int i=0;i<nt;i++)
        queue.push(-1);
    // join thread pool
    for(int i=0;i<nt;i++){
        tids[i].join();
    }
    return 0;
}

结果:

4
8
8
12
12
12
16
16
16
20
20
16
20
20
20
24
24
24
24
24
24
28
28
28
28
28
28
28
32
32
32
32
32
32
32
32

顺便说一句,卡住并没有发生,因为你的 "active wait" 部分。这不好,但通常会导致其他问题(比如减慢系统速度)。