如何仅在所有任务完成时使用 C++ 中的 boost 将新任务添加到线程池
How to add new tasks to threadpool only when all the tasks are finished using boost in c++
boost::asio::io_service ioService;
boost::thread_group threadpool;
boost::barrier barrier(5);
boost::asio::io_service::work work(ioService);
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); //Thread 1
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); //Thread 2
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); //Thread 3 threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); //Thread 4
while (true)
{
{
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
map::iterator it = m_map.begin();
while (it != m_map.end())
{
ioService.post(boost::bind(&ProcessFun, this, it));
++it;
}
ioService.run(); <-- main thread is stuck here..
}
}
我希望能够知道分配给线程池的所有任务都已经完成,只有在再次将任务分配给线程池之后。
只要线程正在处理任务我就不想释放锁。
有什么方法可以确保所有分配的任务都已完成?然后才继续?
根据 boost asio 文档,最简单的方法是调用 ioService.run():
The io_service object's run() function will not exit while work is
underway. It does exit when there is no unfinished work
remaining.
顺便说一下,如果不看更多程序就很难确定这一点,但看来您正试图破坏 asio 的主要目的。您正在序列化成批任务。如果在某种程度上重要的是批次#1 中的所有任务在批次#2 中的任何任务开始之前被完全处理,那么这可能是有道理的,但这是一种奇怪的用法。
另请注意,如果批处理 #1 任务的任何处理程序尝试添加新任务,它们可能会在尝试获取互斥体上的锁时发生死锁。
所以,我的最终解决方案是自己创建一个小信号灯,里面有一个 mutex
和真实的条件
我在这里找到的:
C++0x has no semaphores? How to synchronize threads?
我将这个信号量作为指向线程的指针传递,并在每次迭代时重置它。
我不得不稍微修改一下信号量代码以启用重置功能,并且因为我的线程有时会在主线程进入睡眠状态之前完成工作,所以我不得不修改 abit
中的条件
class semaphore
{
private:
boost::mutex mutex_;
boost::condition_variable condition_;
unsigned long count_;
public:
semaphore()
: count_()
{}
void reset(int x)
{
count = x;
}
void notify()
{
boost::mutex::scoped_lock lock(mutex_);
++count_;
if(count_ < 1)
condition_.notify_one();
}
void wait()
{
boost::mutex::scoped_lock lock(mutex_);
while(count_ > 1)
condition_.wait(lock);
}
};
....
....
semaphore* m_semaphore = new semaphore();
while (true)
{
{
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
map::iterator it = m_map.begin();
if(it ! = m_map.end())
m_semaphore->reset(m_map.size());
while (it != m_map.end())
{
ioService.post(boost::bind(&ProcessFun, this, it, &m_semaphore));
++it;
}
m_semaphore.wait();
}
}
boost::asio::io_service ioService;
boost::thread_group threadpool;
boost::barrier barrier(5);
boost::asio::io_service::work work(ioService);
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); //Thread 1
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); //Thread 2
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); //Thread 3 threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService)); //Thread 4
while (true)
{
{
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
map::iterator it = m_map.begin();
while (it != m_map.end())
{
ioService.post(boost::bind(&ProcessFun, this, it));
++it;
}
ioService.run(); <-- main thread is stuck here..
}
}
我希望能够知道分配给线程池的所有任务都已经完成,只有在再次将任务分配给线程池之后。
只要线程正在处理任务我就不想释放锁。 有什么方法可以确保所有分配的任务都已完成?然后才继续?
根据 boost asio 文档,最简单的方法是调用 ioService.run():
The io_service object's run() function will not exit while work is underway. It does exit when there is no unfinished work remaining.
顺便说一下,如果不看更多程序就很难确定这一点,但看来您正试图破坏 asio 的主要目的。您正在序列化成批任务。如果在某种程度上重要的是批次#1 中的所有任务在批次#2 中的任何任务开始之前被完全处理,那么这可能是有道理的,但这是一种奇怪的用法。
另请注意,如果批处理 #1 任务的任何处理程序尝试添加新任务,它们可能会在尝试获取互斥体上的锁时发生死锁。
所以,我的最终解决方案是自己创建一个小信号灯,里面有一个 mutex
和真实的条件
我在这里找到的:
C++0x has no semaphores? How to synchronize threads?
我将这个信号量作为指向线程的指针传递,并在每次迭代时重置它。 我不得不稍微修改一下信号量代码以启用重置功能,并且因为我的线程有时会在主线程进入睡眠状态之前完成工作,所以我不得不修改 abit
中的条件class semaphore
{
private:
boost::mutex mutex_;
boost::condition_variable condition_;
unsigned long count_;
public:
semaphore()
: count_()
{}
void reset(int x)
{
count = x;
}
void notify()
{
boost::mutex::scoped_lock lock(mutex_);
++count_;
if(count_ < 1)
condition_.notify_one();
}
void wait()
{
boost::mutex::scoped_lock lock(mutex_);
while(count_ > 1)
condition_.wait(lock);
}
};
....
....
semaphore* m_semaphore = new semaphore();
while (true)
{
{
boost::lock_guard<boost::recursive_mutex> lock(mutex_);
map::iterator it = m_map.begin();
if(it ! = m_map.end())
m_semaphore->reset(m_map.size());
while (it != m_map.end())
{
ioService.post(boost::bind(&ProcessFun, this, it, &m_semaphore));
++it;
}
m_semaphore.wait();
}
}