为什么这个线程池死锁或 运行 太多次?
why does this thread pool deadlock or run too many times?
我正在尝试在 c++
中编写满足以下条件的线程池:
- 一个 writer 偶尔会写入一个新的输入值,一旦写入,许多线程并发访问这个相同的值,并且每个线程都吐出一个随机的浮点数。
- 每个工作线程都使用相同的函数,因此没有理由为所有不同的函数构建一个线程安全的队列。我将常用函数存储在
thread_pool
class. 中
- 到目前为止,这些函数是程序中计算量最大的部分。任何阻止这些功能执行其工作的锁是我要避免的主要事情。
- 所有这些函数的浮点输出只是简单的平均。
- 用户有一个名为
thread_pool::start_work
的函数可以更改此共享输入,并告诉所有工作人员完成固定数量的任务。
thread_pool::start_work
returns std::future
以下是我目前所拥有的。它可以构建并且 运行 与 g++ test_tp.cpp -std=c++17 -lpthread; ./a.out
不幸的是它要么死锁要么工作太多(或有时太少)次。我认为这是因为 m_num_comps_done
不是线程安全的。有可能所有线程都跳过了最后一次计数,然后它们都结束了 yield
ing。但是这个变量不是原子的吗?
#include <vector>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <queue>
#include <atomic>
#include <future>
#include <iostream>
#include <numeric>
/**
* @class join_threads
* @brief RAII thread killer
*/
class join_threads
{
std::vector<std::thread>& m_threads;
public:
explicit join_threads(std::vector<std::thread>& threads_)
: m_threads(threads_) {}
~join_threads() {
for(unsigned long i=0; i < m_threads.size(); ++i) {
if(m_threads[i].joinable())
m_threads[i].join();
}
}
};
// how remove the first two template parameters ?
template<typename func_input_t, typename F>
class thread_pool
{
using func_output_t = typename std::result_of<F(func_input_t)>::type;
static_assert( std::is_floating_point<func_output_t>::value,
"function output type must be floating point");
unsigned m_num_comps;
std::atomic_bool m_done;
std::atomic_bool m_has_an_input;
std::atomic<int> m_num_comps_done; // need to be atomic? why?
F m_f; // same function always used
func_input_t m_param; // changed occasionally by a single writer
func_output_t m_working_output; // many reader threads average all their output to get this
std::promise<func_output_t> m_out;
mutable std::shared_mutex m_mut;
mutable std::mutex m_output_mut;
std::vector<std::thread> m_threads;
join_threads m_joiner;
void worker_thread() {
while(!m_done)
{
if(m_has_an_input){
if( m_num_comps_done.load() < m_num_comps - 1 ) {
std::shared_lock<std::shared_mutex> lk(m_mut);
func_output_t tmp = m_f(m_param); // long time
m_num_comps_done++;
// quick
std::lock_guard<std::mutex> lk2(m_output_mut);
m_working_output += tmp / m_num_comps;
}else if(m_num_comps_done.load() == m_num_comps - 1){
std::shared_lock<std::shared_mutex> lk(m_mut);
func_output_t tmp = m_f(m_param); // long time
m_num_comps_done++;
std::lock_guard<std::mutex> lk2(m_output_mut);
m_working_output += tmp / m_num_comps;
m_num_comps_done++;
try{
m_out.set_value(m_working_output);
}catch(std::future_error& e){
std::cout << "future_error caught: " << e.what() << "\n";
}
}else{
std::this_thread::yield();
}
}else{
std::this_thread::yield();
}
}
}
public:
/**
* @brief ctor spawns working threads
*/
thread_pool(F f, unsigned num_comps)
: m_num_comps(num_comps)
, m_done(false)
, m_has_an_input(false)
, m_joiner(m_threads)
, m_f(f)
{
unsigned const thread_count=std::thread::hardware_concurrency(); // should I subtract one?
try {
for(unsigned i=0; i<thread_count; ++i) {
m_threads.push_back( std::thread(&thread_pool::worker_thread, this));
}
} catch(...) {
m_done=true;
throw;
}
}
~thread_pool() {
m_done=true;
}
/**
* @brief changes the shared data member,
* resets the num_comps_left variable,
* resets the accumulator thing to 0, and
* resets the promise object
*/
std::future<func_output_t> start_work(func_input_t new_param) {
std::unique_lock<std::shared_mutex> lk(m_mut);
m_param = new_param;
m_num_comps_done = 0;
m_working_output = 0.0;
m_out = std::promise<func_output_t>();
m_has_an_input = true; // only really matters just after initialization
return m_out.get_future();
}
};
double slowSum(std::vector<double> nums) {
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
return std::accumulate(nums.begin(), nums.end(), 0.0);
}
int main(){
// construct
thread_pool<std::vector<double>, std::function<double(std::vector<double>)>>
le_pool(slowSum, 1000);
// add work
auto ans = le_pool.start_work(std::vector<double>{1.2, 3.2, 4213.1});
std::cout << "final answer is: " << ans.get() << "\n";
std::cout << "it should be 4217.5\n";
return 1;
}
您检查“完成”计数,然后获取锁。这允许多个线程等待锁。特别是,可能没有线程进入第二个 if
正文。
另一方面是因为所有线程一直处于 运行 状态,“最后一个”线程可能无法提前访问其独占部分(在足够多的线程具有 运行 之前) ) 甚至迟到(因为在第一个循环中有其他线程在互斥量处等待)。
要解决第一个问题,因为第二个 if
块具有与第一个 if
块中所有相同的代码,您可以只用一个块来检查计数以查看如果您已经到达终点并且应该设置输出值。
第二个问题需要您在获取互斥量后再次检查m_num_comps_done
。
我正在尝试在 c++
中编写满足以下条件的线程池:
- 一个 writer 偶尔会写入一个新的输入值,一旦写入,许多线程并发访问这个相同的值,并且每个线程都吐出一个随机的浮点数。
- 每个工作线程都使用相同的函数,因此没有理由为所有不同的函数构建一个线程安全的队列。我将常用函数存储在
thread_pool
class. 中
- 到目前为止,这些函数是程序中计算量最大的部分。任何阻止这些功能执行其工作的锁是我要避免的主要事情。
- 所有这些函数的浮点输出只是简单的平均。
- 用户有一个名为
thread_pool::start_work
的函数可以更改此共享输入,并告诉所有工作人员完成固定数量的任务。 thread_pool::start_work
returnsstd::future
以下是我目前所拥有的。它可以构建并且 运行 与 g++ test_tp.cpp -std=c++17 -lpthread; ./a.out
不幸的是它要么死锁要么工作太多(或有时太少)次。我认为这是因为 m_num_comps_done
不是线程安全的。有可能所有线程都跳过了最后一次计数,然后它们都结束了 yield
ing。但是这个变量不是原子的吗?
#include <vector>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <queue>
#include <atomic>
#include <future>
#include <iostream>
#include <numeric>
/**
* @class join_threads
* @brief RAII thread killer
*/
class join_threads
{
std::vector<std::thread>& m_threads;
public:
explicit join_threads(std::vector<std::thread>& threads_)
: m_threads(threads_) {}
~join_threads() {
for(unsigned long i=0; i < m_threads.size(); ++i) {
if(m_threads[i].joinable())
m_threads[i].join();
}
}
};
// how remove the first two template parameters ?
template<typename func_input_t, typename F>
class thread_pool
{
using func_output_t = typename std::result_of<F(func_input_t)>::type;
static_assert( std::is_floating_point<func_output_t>::value,
"function output type must be floating point");
unsigned m_num_comps;
std::atomic_bool m_done;
std::atomic_bool m_has_an_input;
std::atomic<int> m_num_comps_done; // need to be atomic? why?
F m_f; // same function always used
func_input_t m_param; // changed occasionally by a single writer
func_output_t m_working_output; // many reader threads average all their output to get this
std::promise<func_output_t> m_out;
mutable std::shared_mutex m_mut;
mutable std::mutex m_output_mut;
std::vector<std::thread> m_threads;
join_threads m_joiner;
void worker_thread() {
while(!m_done)
{
if(m_has_an_input){
if( m_num_comps_done.load() < m_num_comps - 1 ) {
std::shared_lock<std::shared_mutex> lk(m_mut);
func_output_t tmp = m_f(m_param); // long time
m_num_comps_done++;
// quick
std::lock_guard<std::mutex> lk2(m_output_mut);
m_working_output += tmp / m_num_comps;
}else if(m_num_comps_done.load() == m_num_comps - 1){
std::shared_lock<std::shared_mutex> lk(m_mut);
func_output_t tmp = m_f(m_param); // long time
m_num_comps_done++;
std::lock_guard<std::mutex> lk2(m_output_mut);
m_working_output += tmp / m_num_comps;
m_num_comps_done++;
try{
m_out.set_value(m_working_output);
}catch(std::future_error& e){
std::cout << "future_error caught: " << e.what() << "\n";
}
}else{
std::this_thread::yield();
}
}else{
std::this_thread::yield();
}
}
}
public:
/**
* @brief ctor spawns working threads
*/
thread_pool(F f, unsigned num_comps)
: m_num_comps(num_comps)
, m_done(false)
, m_has_an_input(false)
, m_joiner(m_threads)
, m_f(f)
{
unsigned const thread_count=std::thread::hardware_concurrency(); // should I subtract one?
try {
for(unsigned i=0; i<thread_count; ++i) {
m_threads.push_back( std::thread(&thread_pool::worker_thread, this));
}
} catch(...) {
m_done=true;
throw;
}
}
~thread_pool() {
m_done=true;
}
/**
* @brief changes the shared data member,
* resets the num_comps_left variable,
* resets the accumulator thing to 0, and
* resets the promise object
*/
std::future<func_output_t> start_work(func_input_t new_param) {
std::unique_lock<std::shared_mutex> lk(m_mut);
m_param = new_param;
m_num_comps_done = 0;
m_working_output = 0.0;
m_out = std::promise<func_output_t>();
m_has_an_input = true; // only really matters just after initialization
return m_out.get_future();
}
};
double slowSum(std::vector<double> nums) {
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
return std::accumulate(nums.begin(), nums.end(), 0.0);
}
int main(){
// construct
thread_pool<std::vector<double>, std::function<double(std::vector<double>)>>
le_pool(slowSum, 1000);
// add work
auto ans = le_pool.start_work(std::vector<double>{1.2, 3.2, 4213.1});
std::cout << "final answer is: " << ans.get() << "\n";
std::cout << "it should be 4217.5\n";
return 1;
}
您检查“完成”计数,然后获取锁。这允许多个线程等待锁。特别是,可能没有线程进入第二个 if
正文。
另一方面是因为所有线程一直处于 运行 状态,“最后一个”线程可能无法提前访问其独占部分(在足够多的线程具有 运行 之前) ) 甚至迟到(因为在第一个循环中有其他线程在互斥量处等待)。
要解决第一个问题,因为第二个 if
块具有与第一个 if
块中所有相同的代码,您可以只用一个块来检查计数以查看如果您已经到达终点并且应该设置输出值。
第二个问题需要您在获取互斥量后再次检查m_num_comps_done
。