提升线程和 shared_ptrs

Boost threads and shared_ptrs

我正在和朋友一起做一个业余项目,正在用 Boost 线程和 asio 构建一个线程池。我已经有了一个基本的池,并且 运行ning 但希望能够从池中删除线程。下面的 class 保留了一个向量 std::shared_ptr<boost::thread>

我遇到的问题是当我 运行 一个简单的测试时,程序永远不会退出。它似乎到达终点(见输出)但从未终止。我有一种感觉,它与仍然活着的线程有关,并试图杀死它们但无济于事。我做了一些关于清除共享指针向量的搜索,看起来你真的不应该这样做。同样,对 join_all()thread_group 调用应该加入并结束所有线程,至少我是这么认为的。

我仍在充分学习 boost threads、asio 和 shared_ptr,因此非常感谢任何帮助或建议。

编译:`

g++ -std=c++14 main.cpp -lboost_system -lboost_thread -lpthread

Class:

  class experimentalPool {
      private:
        boost::asio::io_service ioService;
        boost::asio::io_service::work work;
        boost::thread_group threads;
        std::vector<std::shared_ptr<boost::thread>> workers;
        std::size_t poolsize;
      public:
        experimentalPool(): work( ioService ) {
            poolsize = boost::thread::hardware_concurrency();
            for (size_t i = 0; i < poolsize; i++) {
                std::shared_ptr<boost::thread> t1(new boost::thread(
                    boost::bind(&boost::asio::io_service::run, &ioService)));
                threads.add_thread(t1.get());
                workers.push_back(std::move(t1));
            }
        }

        experimentalPool( std::size_t psize )
          : work( ioService ),
            poolsize( psize )
        {
            for (size_t i = 0; i < poolsize; i++) {
                std::shared_ptr<boost::thread> t1(new boost::thread(
                    boost::bind(&boost::asio::io_service::run, &ioService)));
                threads.add_thread(t1.get());
                workers.push_back(std::move(t1));
            }
        }

        template <typename F, typename... Args>
        void add_work(F f, Args... args){
            ioService.post(boost::bind(f, args...));
        };

        ~experimentalPool(){
          ioService.stop();
          try {
            std::cout << "here" << "\n";
            threads.join_all();
            //threads.interrupt_all();
            std::cout << "here" << "\n";
            //for (size_t i = 0; i < workers.size(); i++) {
              //(*workers[i]).interrupt();
            //}
          }
          catch ( const std::exception& ) {std::cout << "Caught Exception" << "\n";}
        }

        std::size_t size() const { return poolsize;}
    };

主线:

void add1(int& num){
    std::cout << ++num << std::endl;
}

int main(){

    int temp = 5;

    experimentalPool xpool(2);
    std::cout << "Xpool " << pool2.size() << "\n";
    xpool.add_work(add1, temp);

    std::this_thread::sleep_for (std::chrono::seconds(1));
    std::cout << "End" << std::endl;
    return 0;
}

输出:

Xpool 1
6
here
xpool here
xpool here
^C
ap@ubuntu:~/Desktop$ 

你的致命问题在这里:

std::shared_ptr<boost::thread> t1(new boost::thread(
    boost::bind(&boost::asio::io_service::run, &ioService)));
threads.add_thread(t1.get());

boost::thread_group(http://www.boost.org/doc/libs/1_42_0/doc/html/thread/thread_management.html#thread.thread_management.threadgroup.destructor)的文档来看,其析构函数的作用是

Destroy *this and delete all boost::thread objects in the group.

因此boost::thread_group将独家参与其线程的资源管理,您不能将这些线程存储在shared_ptr

要解决它,您需要仔细阅读 boost::thread_group 的链接文档,使用 create_thread() 并将其存储为原始指针。

老实说,我看了代码好几分钟,越来越担心。

为什么线程在两个集合中(thread_group 和矢量)?为什么线程属于另一个集合?为什么工作对象是永久的?你为什么要取消服务而不是让池子排干?

为什么 poolsize 被冗余存储?

为什么构造函数 99% 重复?

为什么 add1 采用可变引用,但绑定是按值?

简化

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <thread>

namespace Experimental { 
    namespace ba = boost::asio;

    class Pool {
      private:
        ba::io_service ioService;
        boost::thread_group threads;
        boost::optional<ba::io_service::work> work { ioService };

      public:
        Pool(std::size_t psize = boost::thread::hardware_concurrency()) {
            for (size_t i = 0; i < psize; i++) {
                threads.create_thread(boost::bind(&ba::io_service::run, &ioService));
            }
        }

        template <typename F, typename... Args> void add_work(F f, Args... args) {
            ioService.post(boost::bind(f, args...));
        }

        ~Pool() {
            work.reset();
            threads.join_all();
        }

        std::size_t size() const { return threads.size(); }
    };
}

void add1(int &num) { std::cout << ++num << std::endl; }

int main() {
    int temp = 5;

    Experimental::Pool xpool(2);
    std::cout << "Xpool " << xpool.size() << "\n";
    xpool.add_work(add1, std::ref(temp));

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "End" << std::endl;
}

版画

Xpool 2
6
End
temp: 6

而且它只占代码的 ~50%。