C++ 线程池不是 运行 并行
C++ ThreadPool is not running parallel
我已经尝试实现 ThreadPool,但不幸的是我 运行 遇到了一些问题。
这是我已有的。
//includes ...
void call()
{
std::cout << "Hi i'm thread no " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "ready " << std::this_thread::get_id() << std::endl;
};
//Implementation is not shown here to reduce code
class WorkQueue {
public:
bool push(std::function<void()> const& value);
void pop();
bool empty();
};
std::condition_variable g_queuecheck;
std::mutex g_lockqueue;
std::atomic<bool> g_notified;
class ThreadPool
{
public:
ThreadPool(int iNoThread) :
m_noThread(iNoThread)
{
g_notified.store(false);
m_threads.resize(iNoThread);
bIsReady.store(false);
for (int i = 0; i < iNoThread; ++i)
m_threads[i] = std::thread(&ThreadPool::run, this);
}
void run()
{
while (!bIsReady || !m_workQueue.empty())
{
std::unique_lock<std::mutex> locker(g_lockqueue);
if (m_workQueue.empty())
{
while (!g_notified) // Used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
if(!bIsReady)
g_notified.store(false);
}
m_workQueue.pop();
}
};
void addWork(std::function<void()> func)
{
m_workQueue.push(func);
g_notified.store(true);
g_queuecheck.notify_one();
}
void join()
{
bIsReady.store(true);
g_notified.store(true);
g_queuecheck.notify_all();
for (int i = 0; i < m_noThread; ++i)
m_threads[i].join();
}
~ThreadPool()
{}
WorkQueue m_workQueue;
int m_noThread;
std::vector<std::thread> m_threads;
std::atomic<bool> bIsReady;
};
int _tmain(int argc, _TCHAR* argv[])
{
{
ThreadPool pool(4);
for (int i = 0; i < 8; ++i)
pool.addWork(call); //This work is done sequentially
pool.join();
}
std::cin.ignore();
return 0;
}
我的问题是工作是按顺序完成的。
- 我该如何解决这个问题?
- 我的线程池还有什么问题吗?
- 等待是最佳做法吗?
- How can I fix this?
您没有显示任何实际完成的工作,只有 pop
正在调用。假设确实执行了该函数,您应该注意当您执行此操作时您的 std::unique_lock
仍在范围内,因此它是同步的。您需要释放锁才能使调用并发,这意味着储物柜必须超出范围。
例如,
{ // begin locker scope
std::unique_lock<std::mutex> locker(g_lockqueue);
if (m_workQueue.empty())
{
while (!g_notified) // used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
if(!bIsReady)
g_notified.store(false);
}
} // end locker scope
m_workQueue.pop();
请注意,现在,您的 pop 方法将 运行 并行处理函数,但它也会并行改变队列。这是个问题。您需要执行以下操作:将函数移动到局部变量中,然后弹出,同时保持互斥量;然后在 locker 作用域外调用函数。
- Is something else wrong with my ThreadPool?
您使用的是全局变量,这很丑陋,而且您的变量命名不当:完全不清楚 g_notified
和 bIsReady
的意思。
- Is the waiting best-practice?
没有。即使除了实际的错误,它的意图也是模糊的。
我用boost::asio来实现线程池。希望这可以帮助。此实现是从 Asio Thread Pool 收集的。让这个例子起作用的关键是确定 asio::io_service::work 的范围并让 join_all 在该范围之外。
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/scoped_ptr.hpp>
#include <iostream>
boost::mutex output_mutex;
void call(size_t job_number)
{
{
boost::mutex::scoped_lock print_lock(output_mutex);
std::cout << "Hi i'm job << " << job_number <<" and thread: " << boost::this_thread::get_id() << std::endl;
}
boost::this_thread::sleep_for(boost::chrono::seconds(2));
{
boost::mutex::scoped_lock print_lock(output_mutex);
std::cout << "job " << job_number << " finished. thread " << boost::this_thread::get_id() << " ready." << std::endl;
}
};
int main(int argc, char **argv)
{
size_t number_of_threads = boost::thread::hardware_concurrency();
// the number of jobs does not have to equal the number of
// threads. they will be processed in turn.
size_t number_of_jobs = 3 * number_of_threads;
boost::asio::io_service io_service;
boost::thread_group threads;
{
boost::scoped_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work(io_service) );
for(size_t t = 0; t < number_of_threads; t++)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
}
// post the jobs for work
// notice that the thread id is reused
for( size_t t = 0; t < number_of_jobs; t++ )
{
io_service.post(boost::bind(call,t) );
}
}
threads.join_all();
return 0;
}
问题出在 run
函数中:g_lockqueue
被锁定的时间过长。只要 locker
在范围内,它就会被锁定,因此 g_lockqueue
在 pop
被调用时被锁定。
但是,因为pop
和empty
不能并发执行,所以需要先return然后"Work"加锁,然后释放锁,最后执行工作。
run
可能看起来像这样:
void run()
{
while (!bIsReady || !m_workQueue.empty())
{
Work work;
{
std::unique_lock<std::mutex> locker(g_lockqueue);
if (m_workQueue.empty())
{
while (!g_notified) // used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
if(!bIsReady)
g_notified.store(false);
}
work = m_workQueue.pop(); // get the work to be done while locked
} // g_lockqueue released here
work.do(); // do the work
}
};
我已经尝试实现 ThreadPool,但不幸的是我 运行 遇到了一些问题。
这是我已有的。
//includes ...
void call()
{
std::cout << "Hi i'm thread no " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "ready " << std::this_thread::get_id() << std::endl;
};
//Implementation is not shown here to reduce code
class WorkQueue {
public:
bool push(std::function<void()> const& value);
void pop();
bool empty();
};
std::condition_variable g_queuecheck;
std::mutex g_lockqueue;
std::atomic<bool> g_notified;
class ThreadPool
{
public:
ThreadPool(int iNoThread) :
m_noThread(iNoThread)
{
g_notified.store(false);
m_threads.resize(iNoThread);
bIsReady.store(false);
for (int i = 0; i < iNoThread; ++i)
m_threads[i] = std::thread(&ThreadPool::run, this);
}
void run()
{
while (!bIsReady || !m_workQueue.empty())
{
std::unique_lock<std::mutex> locker(g_lockqueue);
if (m_workQueue.empty())
{
while (!g_notified) // Used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
if(!bIsReady)
g_notified.store(false);
}
m_workQueue.pop();
}
};
void addWork(std::function<void()> func)
{
m_workQueue.push(func);
g_notified.store(true);
g_queuecheck.notify_one();
}
void join()
{
bIsReady.store(true);
g_notified.store(true);
g_queuecheck.notify_all();
for (int i = 0; i < m_noThread; ++i)
m_threads[i].join();
}
~ThreadPool()
{}
WorkQueue m_workQueue;
int m_noThread;
std::vector<std::thread> m_threads;
std::atomic<bool> bIsReady;
};
int _tmain(int argc, _TCHAR* argv[])
{
{
ThreadPool pool(4);
for (int i = 0; i < 8; ++i)
pool.addWork(call); //This work is done sequentially
pool.join();
}
std::cin.ignore();
return 0;
}
我的问题是工作是按顺序完成的。
- 我该如何解决这个问题?
- 我的线程池还有什么问题吗?
- 等待是最佳做法吗?
- How can I fix this?
您没有显示任何实际完成的工作,只有 pop
正在调用。假设确实执行了该函数,您应该注意当您执行此操作时您的 std::unique_lock
仍在范围内,因此它是同步的。您需要释放锁才能使调用并发,这意味着储物柜必须超出范围。
例如,
{ // begin locker scope
std::unique_lock<std::mutex> locker(g_lockqueue);
if (m_workQueue.empty())
{
while (!g_notified) // used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
if(!bIsReady)
g_notified.store(false);
}
} // end locker scope
m_workQueue.pop();
请注意,现在,您的 pop 方法将 运行 并行处理函数,但它也会并行改变队列。这是个问题。您需要执行以下操作:将函数移动到局部变量中,然后弹出,同时保持互斥量;然后在 locker 作用域外调用函数。
- Is something else wrong with my ThreadPool?
您使用的是全局变量,这很丑陋,而且您的变量命名不当:完全不清楚 g_notified
和 bIsReady
的意思。
- Is the waiting best-practice?
没有。即使除了实际的错误,它的意图也是模糊的。
我用boost::asio来实现线程池。希望这可以帮助。此实现是从 Asio Thread Pool 收集的。让这个例子起作用的关键是确定 asio::io_service::work 的范围并让 join_all 在该范围之外。
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/scoped_ptr.hpp>
#include <iostream>
boost::mutex output_mutex;
void call(size_t job_number)
{
{
boost::mutex::scoped_lock print_lock(output_mutex);
std::cout << "Hi i'm job << " << job_number <<" and thread: " << boost::this_thread::get_id() << std::endl;
}
boost::this_thread::sleep_for(boost::chrono::seconds(2));
{
boost::mutex::scoped_lock print_lock(output_mutex);
std::cout << "job " << job_number << " finished. thread " << boost::this_thread::get_id() << " ready." << std::endl;
}
};
int main(int argc, char **argv)
{
size_t number_of_threads = boost::thread::hardware_concurrency();
// the number of jobs does not have to equal the number of
// threads. they will be processed in turn.
size_t number_of_jobs = 3 * number_of_threads;
boost::asio::io_service io_service;
boost::thread_group threads;
{
boost::scoped_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work(io_service) );
for(size_t t = 0; t < number_of_threads; t++)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
}
// post the jobs for work
// notice that the thread id is reused
for( size_t t = 0; t < number_of_jobs; t++ )
{
io_service.post(boost::bind(call,t) );
}
}
threads.join_all();
return 0;
}
问题出在 run
函数中:g_lockqueue
被锁定的时间过长。只要 locker
在范围内,它就会被锁定,因此 g_lockqueue
在 pop
被调用时被锁定。
但是,因为pop
和empty
不能并发执行,所以需要先return然后"Work"加锁,然后释放锁,最后执行工作。
run
可能看起来像这样:
void run()
{
while (!bIsReady || !m_workQueue.empty())
{
Work work;
{
std::unique_lock<std::mutex> locker(g_lockqueue);
if (m_workQueue.empty())
{
while (!g_notified) // used to avoid spurious wakeups
{
g_queuecheck.wait(locker);
}
if(!bIsReady)
g_notified.store(false);
}
work = m_workQueue.pop(); // get the work to be done while locked
} // g_lockqueue released here
work.do(); // do the work
}
};