Producer/Consumer,如何确保在关闭所有消费者之前耗尽线程安全队列?

Producer/Consumer, How to make sure a thread safe queue is drained before turning off all consumers?

我正在处理一个使用线程安全队列的项目。这基本上是一个 producer/consumer 问题。

我目前的代码是

void threadCode()//the consumer
{
    while(active) // active is an atomic int, we use it to turn off everything during destruction
    {
        threadSafeQueue.popFront();//if queue is empty, it will wait for a signal.The queue has a CV.
        // process it
        // if it fails to process it but less than three times, 
        // put it back to the queue to retry later
    }
}

问题是,当我的析构函数将active变为0时,即使队列不为空,所有线程也会被终止。比如它处理item失败,把它放回队列,那么现在active就是0了。

我不希望发生这种情况。我希望在处理队列中的所有内容后销毁该实例。

所以我尝试了这个,

void threadCode()
{
    while( active || queue.size() != 0 )
    { //[1]
        queue.popFront();
        //process
        // put it back to the queue if it fails less than 3 times
    }
}

queue.size() 和 queue.popFront() 是线程安全的。但是将它们放在一起并不是...如果队列中只剩下一件事并且上下文切换发生在 [1] 处。该线程可能会永远休眠。

因为我在析构函数中有类似 threadpool.join() 的东西,而且那个线程永远不会醒来。问题就停留在那里。

不知道有没有人有更好的办法解决这个问题?

谢谢!!

不是让消费者线程检查外部标志,而是让队列本身维护一个内部 "shutting down" 标志。如果没有更多的工作要处理,那么 .popFront() 函数 returns 一个 "shutting down" 状态而不是要处理的项目。