C++ 线程池不是 运行 并行

C++ ThreadPool is not running parallel

我已经尝试实现 ThreadPool,但不幸的是我 运行 遇到了一些问题。

这是我已有的。

//includes ...

void call()
{
    std::cout << "Hi i'm thread no " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "ready " << std::this_thread::get_id() << std::endl;
};

//Implementation is not shown here to reduce code
class WorkQueue {
    public:
        bool push(std::function<void()> const& value);
        void pop();
        bool empty();
};

std::condition_variable g_queuecheck;
std::mutex              g_lockqueue;
std::atomic<bool>       g_notified;

class ThreadPool
{
    public:
        ThreadPool(int iNoThread) :
            m_noThread(iNoThread)
        {
            g_notified.store(false);
            m_threads.resize(iNoThread);
            bIsReady.store(false);
            for (int i = 0; i < iNoThread; ++i)
                m_threads[i] = std::thread(&ThreadPool::run, this);
        }

        void run()
        {
            while (!bIsReady || !m_workQueue.empty())
            {
                std::unique_lock<std::mutex> locker(g_lockqueue);
                if (m_workQueue.empty())
                {
                    while (!g_notified) // Used to avoid spurious wakeups
                    {
                        g_queuecheck.wait(locker);
                    }
                    if(!bIsReady)
                        g_notified.store(false);
                }

                m_workQueue.pop();
            }
        };

        void addWork(std::function<void()> func)
        {
            m_workQueue.push(func);
            g_notified.store(true);
            g_queuecheck.notify_one();
        }

        void join()
        {
            bIsReady.store(true);
            g_notified.store(true);
            g_queuecheck.notify_all();

            for (int i = 0; i < m_noThread; ++i)
                m_threads[i].join();
        }

        ~ThreadPool()
        {}


        WorkQueue m_workQueue;
        int m_noThread;
        std::vector<std::thread> m_threads;
        std::atomic<bool> bIsReady;
};

int _tmain(int argc, _TCHAR* argv[])
{
    {
        ThreadPool pool(4);

        for (int i = 0; i < 8; ++i)
            pool.addWork(call); //This work is done sequentially

        pool.join();
    }

    std::cin.ignore();
    return 0;
}

我的问题是工作是按顺序完成的。

  1. 我该如何解决这个问题?
  2. 我的线程池还有什么问题吗?
  3. 等待是最佳做法吗?
  1. How can I fix this?

您没有显示任何实际完成的工作,只有 pop 正在调用。假设确实执行了该函数,您应该注意当您执行此操作时您的 std::unique_lock 仍在范围内,因此它是同步的。您需要释放锁才能使调用并发,这意味着储物柜必须超出范围。

例如,

    { // begin locker scope
        std::unique_lock<std::mutex> locker(g_lockqueue);
        if (m_workQueue.empty())
        {

            while (!g_notified) // used to avoid spurious wakeups 
            {
                g_queuecheck.wait(locker);
            }
            if(!bIsReady)
                g_notified.store(false);
        }
    } // end locker scope
    m_workQueue.pop();

请注意,现在,您的 pop 方法将 运行 并行处理函数,但它也会并行改变队列。这是个问题。您需要执行以下操作:将函数移动到局部变量中,然后弹出,同时保持互斥量;然后在 locker 作用域外调用函数。

  1. Is something else wrong with my ThreadPool?

您使用的是全局变量,这很丑陋,而且您的变量命名不当:完全不清楚 g_notifiedbIsReady 的意思。

  1. Is the waiting best-practice?

没有。即使除了实际的错误,它的意图也是模糊的。

我用boost::asio来实现线程池。希望这可以帮助。此实现是从 Asio Thread Pool 收集的。让这个例子起作用的关键是确定 asio::io_service::work 的范围并让 join_all 在该范围之外。

#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
#include <boost/scoped_ptr.hpp>
#include <iostream>


boost::mutex output_mutex;
void call(size_t job_number)
{
    {
       boost::mutex::scoped_lock print_lock(output_mutex);
       std::cout << "Hi i'm job << " << job_number <<" and thread: " << boost::this_thread::get_id() << std::endl;
    }

    boost::this_thread::sleep_for(boost::chrono::seconds(2));
    {
       boost::mutex::scoped_lock print_lock(output_mutex);
       std::cout << "job " << job_number << " finished. thread " << boost::this_thread::get_id() << " ready." << std::endl;
    }
};

int main(int argc, char **argv)
{
    size_t number_of_threads = boost::thread::hardware_concurrency();

    // the number of jobs does not have to equal the number of
    // threads.  they will be processed in turn.
    size_t number_of_jobs = 3 * number_of_threads;

    boost::asio::io_service io_service;
    boost::thread_group threads;
    {
        boost::scoped_ptr< boost::asio::io_service::work > work( new boost::asio::io_service::work(io_service) );
        for(size_t t = 0; t < number_of_threads; t++)
        {
            threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
        }

        // post the jobs for work
        // notice that the thread id is reused
        for( size_t t = 0; t < number_of_jobs; t++ )
        {
            io_service.post(boost::bind(call,t) );
        }
    }
    threads.join_all();

    return 0;
}

问题出在 run 函数中:g_lockqueue 被锁定的时间过长。只要 locker 在范围内,它就会被锁定,因此 g_lockqueuepop 被调用时被锁定。
但是,因为popempty不能并发执行,所以需要先return然后"Work"加锁,然后释放锁,最后执行工作。

run 可能看起来像这样:

void run()
{
    while (!bIsReady || !m_workQueue.empty())
    {
        Work work;
        {
            std::unique_lock<std::mutex> locker(g_lockqueue);
            if (m_workQueue.empty())
            {

                while (!g_notified) // used to avoid spurious wakeups 
                {
                    g_queuecheck.wait(locker);
                }
                if(!bIsReady)
                    g_notified.store(false);
            }
            work = m_workQueue.pop();  // get the work to be done while locked
        } // g_lockqueue released here
        work.do();  // do the work
    }
};