C++:Boost.Asio:在新线程中启动 SSL 服务器会话

C++: Boost.Asio: Start SSL Server session in a new thread

我基于this example for the server编写了一对server/client程序,并且我完成了所有通信协议。服务器应该从多个客户端的多个连接接收多个连接,所以我想将会话彼此分开,我希望我可以用 std::thread.

来做到这一点

这看起来很容易,但我完全不知道该怎么做。所有在线示例似乎都展示了如何 运行 并行执行一个函数,但似乎并未展示如何在新线程中创建对象。

我已经发表了一些评论来解释我对这个会话机制的理解。

我想使用的代码如下:

class server
{
public:
  server(boost::asio::io_service& io_service, unsigned short port)
    : io_service_(io_service),
      acceptor_(io_service,
          boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
      context_(io_service, boost::asio::ssl::context::sslv23)
  {
    //some code...


    //notice the next lines here create the session object, and then recurs that to receive more connections
    session* new_session = new session(io_service_, context_);

    //this is called to accept more connections if available, the callback function is called with start() to start the session
    acceptor_.async_accept(new_session->socket(),
        boost::bind(&server::handle_accept, this, new_session,
          boost::asio::placeholders::error));
  }

  void handle_accept(session* new_session, const boost::system::error_code& error)
  {
    if (!error)
    {
      //so the session starts here, and another object is created waiting for another session
      new_session->start();
      new_session = new session(io_service_, context_);
      //now this function is, again, a call back function to make use of new_session, the new object that's waiting for a connection
      acceptor_.async_accept(new_session->socket(),
          boost::bind(&server::handle_accept, this, new_session,
            boost::asio::placeholders::error));
    }
    else
    {
      delete new_session;
    }
  }

private:
  boost::asio::io_service& io_service_;
  boost::asio::ip::tcp::acceptor acceptor_;
  boost::asio::ssl::context context_;
};

如何在新 std::thread 中创建这些会话?

如果您需要任何其他信息,请询问。谢谢。

我已将 的示例与您的示例代码混合使用。

它演示了相同的原理,但是 运行 io_service 在您的硬件支持的线程数上(即 thread::hardware_concurrency)。

关键在于

  • (共享)对象生命周期
  • 线程安全

大多数 Asio 对象都不是线程安全的。因此,您需要同步访问它们。老式的互斥(std::mutex 等)在这种情况下效果不佳(因为你真的不想锁定每个完成处理程序并且你 reeeeeeeally不想在异步调用中保持锁定 ¹.

Boost Asio 针对这种情况有 strands 的概念:

我选择了最简单的解决方案来对 "socket"(ssl stream/connection/session 或者你会在逻辑上引用它)进行所有操作。

除此之外,我还对 acceptor_ 的所有访问权限进行了序列化处理。

A hybrid solution might move all the connections on a io_service+pool and keep the listener (Server) on a separate io_service which could then be it's own implicit strand

注意:关于关机顺序:

  • 我明确地销毁了 Server,因此我们可以根据需要停止其 strand(!!) 上的 acceptor_
  • pool 个线程只有在所有连接都关闭后才会完成。如果你想控制它,请再次查看链接的答案(它显示了如何保持弱指针跟踪连接)。或者,您可以为会话中的所有异步操作设置超时并检查 Server 是否有关闭信号。

演示代码

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace bs  = boost::system;
namespace ba  = boost::asio;
namespace bas = ba::ssl;

using ba::ip::tcp;
using SslContext = boost::shared_ptr<bas::context>;

typedef ba::ip::tcp::acceptor    acceptor_type;
typedef bas::stream<tcp::socket> stream_type;

const short PORT = 26767;

class Session : public boost::enable_shared_from_this<Session>
{
public:
    typedef boost::shared_ptr<Session> Ptr;

    Session(ba::io_service& svc, SslContext ctx) : strand_(svc), ctx_(ctx), stream(svc, *ctx) { }

    virtual ~Session() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

    stream_type::lowest_layer_type& socket() { return stream.lowest_layer(); } 
    void start()                             { AsyncReadString();          } 
    void Stop()                              { stream.shutdown();            } 

protected:
    ba::io_service::strand strand_;
    SslContext             ctx_;
    stream_type            stream;
    ba::streambuf          stream_buffer;
    std::string            message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        ba::async_read_until(
            stream,
            stream_buffer,
            '[=10=]', // null-char is a delimiter
            strand_.wrap(
                boost::bind(&Session::ReadHandler, shared_from_this(),
                    ba::placeholders::error,
                    ba::placeholders::bytes_transferred)));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        message = s;

        ba::async_write(
            stream,
            ba::buffer(message.c_str(), message.size()+1),
            strand_.wrap(
                boost::bind(&Session::WriteHandler, shared_from_this(),
                         ba::placeholders::error,
                         ba::placeholders::bytes_transferred)));
    }

    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '[=10=]');
        return s;
    }

    void ReadHandler(const bs::error_code &ec, std::size_t /*bytes_transferred*/) 
    {
        std::cout << __PRETTY_FUNCTION__ << "\n";

        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }

    void WriteHandler(const bs::error_code &/*ec*/, std::size_t /*bytes_transferred*/) {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }
};

class Server : public boost::enable_shared_from_this<Server>
{
  public:
    Server(ba::io_service& io_service, unsigned short port) :
        strand_  (io_service),
        acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
        context_ (boost::make_shared<bas::context>(io_service, bas::context::sslv23))
    {
        //
    }

    void start_accept() {
        auto new_session = boost::make_shared<Session>(strand_.get_io_service(), context_);

        acceptor_.async_accept(new_session->socket(),
                strand_.wrap(boost::bind(&Server::handle_accept, this, new_session, ba::placeholders::error)));
    }

    void stop_accept() {
        auto keep = shared_from_this();
        strand_.post([keep] { keep->acceptor_.close(); });
    }

    void handle_accept(Session::Ptr new_session, const bs::error_code& error)
    {
        if (!error) {
            new_session->start();
            start_accept(); // uses `acceptor_` safely because of the strand_
        }
    }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
    }

  private:
    ba::io_service::strand strand_;
    tcp::acceptor          acceptor_;
    SslContext             context_;
};

int main() {
    ba::io_service svc;
    boost::thread_group pool;

    {
        auto s = boost::make_shared<Server>(svc, PORT);
        s->start_accept();

        for (auto i = 0u; i<boost::thread::hardware_concurrency(); ++i)
            pool.create_thread([&]{svc.run();});

        std::cerr << "Shutdown in 10 seconds...\n";
        boost::this_thread::sleep_for(boost::chrono::seconds(10)); // auto-shutdown in 10s

        std::cerr << "Shutdown...\n";
    } // destructor of Server // TODO thread-safe

    pool.join_all();
}

打印

$ (for a in {1..20000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\0" | netcat localhost 6767)& done)

$ time ./test | sort | uniq -c | sort -n | tail
Shutdown in 10 seconds...
Shutdown...
      1 Server::~Server()
      1 void Session::AsyncReadString()virtual Session::~Session()
      1 void Session::AsyncReadString()void Session::ReadHandler(const boost::system::error_code&, std::size_t)
      1 void Session::ReadHandler(const boost::system::error_code&, std::size_t)void Session::AsyncReadString()
      3 
   4523 void Session::AsyncReadString()
   4524 void Session::ReadHandler(const boost::system::error_code&, std::size_t)
   4526 virtual Session::~Session()

real    0m10.128s
user    0m0.430s
sys 0m0.262s

¹ 异步的全部意义在于避免阻塞可能需要 "longer" 的 IO 操作。锁定的想法是永远不要锁定 "longer" 时间量,否则它们会破坏可伸缩性