Producer/Consumer,如何确保在关闭所有消费者之前耗尽线程安全队列?
Producer/Consumer, How to make sure a thread safe queue is drained before turning off all consumers?
我正在处理一个使用线程安全队列的项目。这基本上是一个 producer/consumer 问题。
我目前的代码是
void threadCode()//the consumer
{
while(active) // active is an atomic int, we use it to turn off everything during destruction
{
threadSafeQueue.popFront();//if queue is empty, it will wait for a signal.The queue has a CV.
// process it
// if it fails to process it but less than three times,
// put it back to the queue to retry later
}
}
问题是,当我的析构函数将active变为0时,即使队列不为空,所有线程也会被终止。比如它处理item失败,把它放回队列,那么现在active就是0了。
我不希望发生这种情况。我希望在处理队列中的所有内容后销毁该实例。
所以我尝试了这个,
void threadCode()
{
while( active || queue.size() != 0 )
{ //[1]
queue.popFront();
//process
// put it back to the queue if it fails less than 3 times
}
}
queue.size() 和 queue.popFront() 是线程安全的。但是将它们放在一起并不是...如果队列中只剩下一件事并且上下文切换发生在 [1] 处。该线程可能会永远休眠。
因为我在析构函数中有类似 threadpool.join() 的东西,而且那个线程永远不会醒来。问题就停留在那里。
不知道有没有人有更好的办法解决这个问题?
谢谢!!
不是让消费者线程检查外部标志,而是让队列本身维护一个内部 "shutting down" 标志。如果没有更多的工作要处理,那么 .popFront()
函数 returns 一个 "shutting down" 状态而不是要处理的项目。
我正在处理一个使用线程安全队列的项目。这基本上是一个 producer/consumer 问题。
我目前的代码是
void threadCode()//the consumer
{
while(active) // active is an atomic int, we use it to turn off everything during destruction
{
threadSafeQueue.popFront();//if queue is empty, it will wait for a signal.The queue has a CV.
// process it
// if it fails to process it but less than three times,
// put it back to the queue to retry later
}
}
问题是,当我的析构函数将active变为0时,即使队列不为空,所有线程也会被终止。比如它处理item失败,把它放回队列,那么现在active就是0了。
我不希望发生这种情况。我希望在处理队列中的所有内容后销毁该实例。
所以我尝试了这个,
void threadCode()
{
while( active || queue.size() != 0 )
{ //[1]
queue.popFront();
//process
// put it back to the queue if it fails less than 3 times
}
}
queue.size() 和 queue.popFront() 是线程安全的。但是将它们放在一起并不是...如果队列中只剩下一件事并且上下文切换发生在 [1] 处。该线程可能会永远休眠。
因为我在析构函数中有类似 threadpool.join() 的东西,而且那个线程永远不会醒来。问题就停留在那里。
不知道有没有人有更好的办法解决这个问题?
谢谢!!
不是让消费者线程检查外部标志,而是让队列本身维护一个内部 "shutting down" 标志。如果没有更多的工作要处理,那么 .popFront()
函数 returns 一个 "shutting down" 状态而不是要处理的项目。