为什么这个线程池死锁或 运行 太多次?

why does this thread pool deadlock or run too many times?

我正在尝试在 c++ 中编写满足以下条件的线程池:

以下是我目前所拥有的。它可以构建并且 运行 与 g++ test_tp.cpp -std=c++17 -lpthread; ./a.out 不幸的是它要么死锁要么工作太多(或有时太少)次。我认为这是因为 m_num_comps_done 不是线程安全的。有可能所有线程都跳过了最后一次计数,然后它们都结束了 yielding。但是这个变量不是原子的吗?

#include <vector>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <queue>
#include <atomic>
#include <future>

#include <iostream>
#include <numeric>

/**
 * @class join_threads
 * @brief RAII thread killer
 */
class join_threads
{
    std::vector<std::thread>& m_threads;
public:

    explicit join_threads(std::vector<std::thread>& threads_)
        : m_threads(threads_) {}

    ~join_threads() {
        for(unsigned long i=0; i < m_threads.size(); ++i) {
            if(m_threads[i].joinable())
                m_threads[i].join();
        }
    }
};


// how remove the first two template parameters ?
template<typename func_input_t, typename F>
class thread_pool
{

    using func_output_t = typename std::result_of<F(func_input_t)>::type;

    static_assert( std::is_floating_point<func_output_t>::value,  
            "function output type must be floating point");

    unsigned m_num_comps;
    std::atomic_bool m_done;
    std::atomic_bool m_has_an_input;
    std::atomic<int> m_num_comps_done; // need to be atomic? why?
    F m_f; // same function always used
    func_input_t m_param; // changed occasionally by a single writer
    func_output_t m_working_output; // many reader threads average all their output to get this
    std::promise<func_output_t> m_out;
    mutable std::shared_mutex m_mut;
    mutable std::mutex m_output_mut;
    std::vector<std::thread> m_threads;
    join_threads m_joiner;
    
    void worker_thread() {

        while(!m_done)
        {
            if(m_has_an_input){
                if( m_num_comps_done.load() < m_num_comps - 1 ) {
                    
                    std::shared_lock<std::shared_mutex> lk(m_mut);
                    func_output_t tmp = m_f(m_param); // long time
                    m_num_comps_done++;

                    // quick
                    std::lock_guard<std::mutex> lk2(m_output_mut);
                    m_working_output += tmp / m_num_comps;
                
                }else if(m_num_comps_done.load() == m_num_comps - 1){
                    
                    std::shared_lock<std::shared_mutex> lk(m_mut);
                    func_output_t tmp = m_f(m_param); // long time
                    m_num_comps_done++;

                    std::lock_guard<std::mutex> lk2(m_output_mut);
                    m_working_output += tmp / m_num_comps;
                    m_num_comps_done++;

                    try{                    
                        m_out.set_value(m_working_output);
                    }catch(std::future_error& e){
                        std::cout << "future_error caught: " << e.what() << "\n";
                    }

                }else{
                    std::this_thread::yield();
                }

            }else{
                  std::this_thread::yield();
            }
        }
    }

public:
   
    /**
     * @brief ctor spawns working threads
     */  
    thread_pool(F f, unsigned num_comps) 
        : m_num_comps(num_comps)
        , m_done(false)
        , m_has_an_input(false)
        , m_joiner(m_threads)
        , m_f(f) 
    {

        unsigned const thread_count=std::thread::hardware_concurrency(); // should I subtract one?

        try {
            for(unsigned i=0; i<thread_count; ++i) {
                m_threads.push_back( std::thread(&thread_pool::worker_thread, this));
            }
        } catch(...) {
            m_done=true;
            throw;
        }
    }

    ~thread_pool() {
        m_done=true;
    }


    /**
     * @brief changes the shared data member, 
     * resets the num_comps_left variable, 
     * resets the accumulator thing to 0, and
     * resets the promise object
     */
    std::future<func_output_t> start_work(func_input_t new_param) {
        std::unique_lock<std::shared_mutex> lk(m_mut);
        m_param = new_param;
        m_num_comps_done = 0;
        m_working_output = 0.0;
        m_out = std::promise<func_output_t>();
        m_has_an_input = true; // only really matters just after initialization
        return m_out.get_future();
    }
};


double slowSum(std::vector<double> nums) {
//    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    return std::accumulate(nums.begin(), nums.end(), 0.0);
}

int main(){

    // construct
    thread_pool<std::vector<double>, std::function<double(std::vector<double>)>> 
        le_pool(slowSum, 1000);
    
    // add work
    auto ans = le_pool.start_work(std::vector<double>{1.2, 3.2, 4213.1}); 
    std::cout << "final answer is: " << ans.get() << "\n";
    std::cout << "it should be 4217.5\n";

    return 1;
}

您检查“完成”计数,然后获取锁。这允许多个线程等待锁。特别是,可能没有线程进入第二个 if 正文。

另一方面是因为所有线程一直处于 运行 状态,“最后一个”线程可能无法提前访问其独占部分(在足够多的线程具有 运行 之前) ) 甚至迟到(因为在第一个循环中有其他线程在互斥量处等待)。

要解决第一个问题,因为第二个 if 块具有与第一个 if 块中所有相同的代码,您可以只用一个块来检查计数以查看如果您已经到达终点并且应该设置输出值。

第二个问题需要您在获取互斥量后再次检查m_num_comps_done