如何仅在所有任务完成时使用 C++ 中的 boost 将新任务添加到线程池

How to add new tasks to threadpool only when all the tasks are finished using boost in c++

boost::asio::io_service ioService;
boost::thread_group threadpool;
boost::barrier barrier(5);
boost::asio::io_service::work work(ioService);
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));  //Thread 1
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));  //Thread 2
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));  //Thread 3 threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));  //Thread 4

while (true)
{
  { 
     boost::lock_guard<boost::recursive_mutex> lock(mutex_);
     map::iterator it = m_map.begin();   
     while (it != m_map.end())
     {
         ioService.post(boost::bind(&ProcessFun, this, it));
         ++it;    
     }

    ioService.run(); <-- main thread is stuck here..  
   }

}

我希望能够知道分配给线程池的所有任务都已经完成,只有在再次将任务分配给线程池之后。

只要线程正在处理任务我就不想释放锁。 有什么方法可以确保所有分配的任务都已完成?然后才继续?

根据 boost asio 文档,最简单的方法是调用 ioService.run():

The io_service object's run() function will not exit while work is underway. It does exit when there is no unfinished work remaining.

顺便说一下,如果不看更多程序就很难确定这一点,但看来您正试图破坏 asio 的主要目的。您正在序列化成批任务。如果在某种程度上重要的是批次#1 中的所有任务在批次#2 中的任何任务开始之前被完全处理,那么这可能是有道理的,但这是一种奇怪的用法。

另请注意,如果批处理 #1 任务的任何处理程序尝试添加新任务,它们可能会在尝试获取互斥体上的锁时发生死锁。

所以,我的最终解决方案是自己创建一个小信号灯,里面有一个 mutex 和真实的条件 我在这里找到的:

C++0x has no semaphores? How to synchronize threads?

我将这个信号量作为指向线程的指针传递,并在每次迭代时重置它。 我不得不稍微修改一下信号量代码以启用重置功能,并且因为我的线程有时会在主线程进入睡眠状态之前完成工作,所以我不得不修改 abit

中的条件
class semaphore
{
private:
    boost::mutex mutex_;
    boost::condition_variable condition_;
    unsigned long count_;

public:
    semaphore()
        : count_()
    {}

    void reset(int x)
    {
      count = x;
    }
    void notify()
    {
        boost::mutex::scoped_lock lock(mutex_);
        ++count_;
        if(count_ < 1)
            condition_.notify_one();
    }

    void wait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        while(count_ > 1)
            condition_.wait(lock);

    }
};


....
....
semaphore*  m_semaphore = new semaphore();

while (true)
{
  { 
     boost::lock_guard<boost::recursive_mutex> lock(mutex_);
     map::iterator it = m_map.begin();
     if(it ! = m_map.end())
        m_semaphore->reset(m_map.size());  
     while (it != m_map.end())
     {
         ioService.post(boost::bind(&ProcessFun, this, it, &m_semaphore));
         ++it;    
     }

      m_semaphore.wait();
   }

}