C++:Boost.Asio:在新线程中启动 SSL 服务器会话
C++: Boost.Asio: Start SSL Server session in a new thread
我基于this example for the server编写了一对server/client程序,并且我完成了所有通信协议。服务器应该从多个客户端的多个连接接收多个连接,所以我想将会话彼此分开,我希望我可以用 std::thread
.
来做到这一点
这看起来很容易,但我完全不知道该怎么做。所有在线示例似乎都展示了如何 运行 并行执行一个函数,但似乎并未展示如何在新线程中创建对象。
我已经发表了一些评论来解释我对这个会话机制的理解。
我想使用的代码如下:
class server
{
public:
server(boost::asio::io_service& io_service, unsigned short port)
: io_service_(io_service),
acceptor_(io_service,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
context_(io_service, boost::asio::ssl::context::sslv23)
{
//some code...
//notice the next lines here create the session object, and then recurs that to receive more connections
session* new_session = new session(io_service_, context_);
//this is called to accept more connections if available, the callback function is called with start() to start the session
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
void handle_accept(session* new_session, const boost::system::error_code& error)
{
if (!error)
{
//so the session starts here, and another object is created waiting for another session
new_session->start();
new_session = new session(io_service_, context_);
//now this function is, again, a call back function to make use of new_session, the new object that's waiting for a connection
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
else
{
delete new_session;
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ssl::context context_;
};
如何在新 std::thread
中创建这些会话?
如果您需要任何其他信息,请询问。谢谢。
我已将 的示例与您的示例代码混合使用。
它演示了相同的原理,但是 运行 io_service 在您的硬件支持的线程数上(即 thread::hardware_concurrency
)。
关键在于
- (共享)对象生命周期
- 线程安全
大多数 Asio 对象都不是线程安全的。因此,您需要同步访问它们。老式的互斥(std::mutex
等)在这种情况下效果不佳(因为你真的不想锁定每个完成处理程序并且你 reeeeeeeally不想在异步调用中保持锁定 ¹.
Boost Asio 针对这种情况有 strand
s 的概念:
- http://www.boost.org/doc/libs/1_58_0/doc/html/boost_asio/reference/io_service__strand.html
- Why do I need strand per connection when using boost::asio?
我选择了最简单的解决方案来对 "socket"(ssl stream/connection/session 或者你会在逻辑上引用它)进行所有操作。
除此之外,我还对 acceptor_
的所有访问权限进行了序列化处理。
A hybrid solution might move all the connections on a io_service
+pool and keep the listener (Server
) on a separate io_service
which could then be it's own implicit strand
注意:关于关机顺序:
- 我明确地销毁了
Server
,因此我们可以根据需要停止其 strand
(!!) 上的 acceptor_
。
pool
个线程只有在所有连接都关闭后才会完成。如果你想控制它,请再次查看链接的答案(它显示了如何保持弱指针跟踪连接)。或者,您可以为会话中的所有异步操作设置超时并检查 Server
是否有关闭信号。
演示代码
#include <boost/array.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>
namespace bs = boost::system;
namespace ba = boost::asio;
namespace bas = ba::ssl;
using ba::ip::tcp;
using SslContext = boost::shared_ptr<bas::context>;
typedef ba::ip::tcp::acceptor acceptor_type;
typedef bas::stream<tcp::socket> stream_type;
const short PORT = 26767;
class Session : public boost::enable_shared_from_this<Session>
{
public:
typedef boost::shared_ptr<Session> Ptr;
Session(ba::io_service& svc, SslContext ctx) : strand_(svc), ctx_(ctx), stream(svc, *ctx) { }
virtual ~Session() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
stream_type::lowest_layer_type& socket() { return stream.lowest_layer(); }
void start() { AsyncReadString(); }
void Stop() { stream.shutdown(); }
protected:
ba::io_service::strand strand_;
SslContext ctx_;
stream_type stream;
ba::streambuf stream_buffer;
std::string message;
void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
ba::async_read_until(
stream,
stream_buffer,
'[=10=]', // null-char is a delimiter
strand_.wrap(
boost::bind(&Session::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "\n";
message = s;
ba::async_write(
stream,
ba::buffer(message.c_str(), message.size()+1),
strand_.wrap(
boost::bind(&Session::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}
std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '[=10=]');
return s;
}
void ReadHandler(const bs::error_code &ec, std::size_t /*bytes_transferred*/)
{
std::cout << __PRETTY_FUNCTION__ << "\n";
if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
void WriteHandler(const bs::error_code &/*ec*/, std::size_t /*bytes_transferred*/) {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
};
class Server : public boost::enable_shared_from_this<Server>
{
public:
Server(ba::io_service& io_service, unsigned short port) :
strand_ (io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
context_ (boost::make_shared<bas::context>(io_service, bas::context::sslv23))
{
//
}
void start_accept() {
auto new_session = boost::make_shared<Session>(strand_.get_io_service(), context_);
acceptor_.async_accept(new_session->socket(),
strand_.wrap(boost::bind(&Server::handle_accept, this, new_session, ba::placeholders::error)));
}
void stop_accept() {
auto keep = shared_from_this();
strand_.post([keep] { keep->acceptor_.close(); });
}
void handle_accept(Session::Ptr new_session, const bs::error_code& error)
{
if (!error) {
new_session->start();
start_accept(); // uses `acceptor_` safely because of the strand_
}
}
~Server() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
private:
ba::io_service::strand strand_;
tcp::acceptor acceptor_;
SslContext context_;
};
int main() {
ba::io_service svc;
boost::thread_group pool;
{
auto s = boost::make_shared<Server>(svc, PORT);
s->start_accept();
for (auto i = 0u; i<boost::thread::hardware_concurrency(); ++i)
pool.create_thread([&]{svc.run();});
std::cerr << "Shutdown in 10 seconds...\n";
boost::this_thread::sleep_for(boost::chrono::seconds(10)); // auto-shutdown in 10s
std::cerr << "Shutdown...\n";
} // destructor of Server // TODO thread-safe
pool.join_all();
}
打印
$ (for a in {1..20000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\0" | netcat localhost 6767)& done)
$ time ./test | sort | uniq -c | sort -n | tail
Shutdown in 10 seconds...
Shutdown...
1 Server::~Server()
1 void Session::AsyncReadString()virtual Session::~Session()
1 void Session::AsyncReadString()void Session::ReadHandler(const boost::system::error_code&, std::size_t)
1 void Session::ReadHandler(const boost::system::error_code&, std::size_t)void Session::AsyncReadString()
3
4523 void Session::AsyncReadString()
4524 void Session::ReadHandler(const boost::system::error_code&, std::size_t)
4526 virtual Session::~Session()
real 0m10.128s
user 0m0.430s
sys 0m0.262s
¹ 异步的全部意义在于避免阻塞可能需要 "longer" 的 IO 操作。锁定的想法是永远不要锁定 "longer" 时间量,否则它们会破坏可伸缩性
我基于this example for the server编写了一对server/client程序,并且我完成了所有通信协议。服务器应该从多个客户端的多个连接接收多个连接,所以我想将会话彼此分开,我希望我可以用 std::thread
.
这看起来很容易,但我完全不知道该怎么做。所有在线示例似乎都展示了如何 运行 并行执行一个函数,但似乎并未展示如何在新线程中创建对象。
我已经发表了一些评论来解释我对这个会话机制的理解。
我想使用的代码如下:
class server
{
public:
server(boost::asio::io_service& io_service, unsigned short port)
: io_service_(io_service),
acceptor_(io_service,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
context_(io_service, boost::asio::ssl::context::sslv23)
{
//some code...
//notice the next lines here create the session object, and then recurs that to receive more connections
session* new_session = new session(io_service_, context_);
//this is called to accept more connections if available, the callback function is called with start() to start the session
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
void handle_accept(session* new_session, const boost::system::error_code& error)
{
if (!error)
{
//so the session starts here, and another object is created waiting for another session
new_session->start();
new_session = new session(io_service_, context_);
//now this function is, again, a call back function to make use of new_session, the new object that's waiting for a connection
acceptor_.async_accept(new_session->socket(),
boost::bind(&server::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
else
{
delete new_session;
}
}
private:
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ssl::context context_;
};
如何在新 std::thread
中创建这些会话?
如果您需要任何其他信息,请询问。谢谢。
我已将
它演示了相同的原理,但是 运行 io_service 在您的硬件支持的线程数上(即 thread::hardware_concurrency
)。
关键在于
- (共享)对象生命周期
- 线程安全
大多数 Asio 对象都不是线程安全的。因此,您需要同步访问它们。老式的互斥(std::mutex
等)在这种情况下效果不佳(因为你真的不想锁定每个完成处理程序并且你 reeeeeeeally不想在异步调用中保持锁定 ¹.
Boost Asio 针对这种情况有 strand
s 的概念:
- http://www.boost.org/doc/libs/1_58_0/doc/html/boost_asio/reference/io_service__strand.html
- Why do I need strand per connection when using boost::asio?
我选择了最简单的解决方案来对 "socket"(ssl stream/connection/session 或者你会在逻辑上引用它)进行所有操作。
除此之外,我还对 acceptor_
的所有访问权限进行了序列化处理。
A hybrid solution might move all the connections on a
io_service
+pool and keep the listener (Server
) on a separateio_service
which could then be it's own implicit strand
注意:关于关机顺序:
- 我明确地销毁了
Server
,因此我们可以根据需要停止其strand
(!!) 上的acceptor_
。 pool
个线程只有在所有连接都关闭后才会完成。如果你想控制它,请再次查看链接的答案(它显示了如何保持弱指针跟踪连接)。或者,您可以为会话中的所有异步操作设置超时并检查Server
是否有关闭信号。
演示代码
#include <boost/array.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>
namespace bs = boost::system;
namespace ba = boost::asio;
namespace bas = ba::ssl;
using ba::ip::tcp;
using SslContext = boost::shared_ptr<bas::context>;
typedef ba::ip::tcp::acceptor acceptor_type;
typedef bas::stream<tcp::socket> stream_type;
const short PORT = 26767;
class Session : public boost::enable_shared_from_this<Session>
{
public:
typedef boost::shared_ptr<Session> Ptr;
Session(ba::io_service& svc, SslContext ctx) : strand_(svc), ctx_(ctx), stream(svc, *ctx) { }
virtual ~Session() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
stream_type::lowest_layer_type& socket() { return stream.lowest_layer(); }
void start() { AsyncReadString(); }
void Stop() { stream.shutdown(); }
protected:
ba::io_service::strand strand_;
SslContext ctx_;
stream_type stream;
ba::streambuf stream_buffer;
std::string message;
void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
ba::async_read_until(
stream,
stream_buffer,
'[=10=]', // null-char is a delimiter
strand_.wrap(
boost::bind(&Session::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "\n";
message = s;
ba::async_write(
stream,
ba::buffer(message.c_str(), message.size()+1),
strand_.wrap(
boost::bind(&Session::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred)));
}
std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "\n";
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '[=10=]');
return s;
}
void ReadHandler(const bs::error_code &ec, std::size_t /*bytes_transferred*/)
{
std::cout << __PRETTY_FUNCTION__ << "\n";
if (!ec) {
std::cout << (ExtractString() + "\n");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
void WriteHandler(const bs::error_code &/*ec*/, std::size_t /*bytes_transferred*/) {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
};
class Server : public boost::enable_shared_from_this<Server>
{
public:
Server(ba::io_service& io_service, unsigned short port) :
strand_ (io_service),
acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
context_ (boost::make_shared<bas::context>(io_service, bas::context::sslv23))
{
//
}
void start_accept() {
auto new_session = boost::make_shared<Session>(strand_.get_io_service(), context_);
acceptor_.async_accept(new_session->socket(),
strand_.wrap(boost::bind(&Server::handle_accept, this, new_session, ba::placeholders::error)));
}
void stop_accept() {
auto keep = shared_from_this();
strand_.post([keep] { keep->acceptor_.close(); });
}
void handle_accept(Session::Ptr new_session, const bs::error_code& error)
{
if (!error) {
new_session->start();
start_accept(); // uses `acceptor_` safely because of the strand_
}
}
~Server() {
std::cout << __PRETTY_FUNCTION__ << "\n";
}
private:
ba::io_service::strand strand_;
tcp::acceptor acceptor_;
SslContext context_;
};
int main() {
ba::io_service svc;
boost::thread_group pool;
{
auto s = boost::make_shared<Server>(svc, PORT);
s->start_accept();
for (auto i = 0u; i<boost::thread::hardware_concurrency(); ++i)
pool.create_thread([&]{svc.run();});
std::cerr << "Shutdown in 10 seconds...\n";
boost::this_thread::sleep_for(boost::chrono::seconds(10)); // auto-shutdown in 10s
std::cerr << "Shutdown...\n";
} // destructor of Server // TODO thread-safe
pool.join_all();
}
打印
$ (for a in {1..20000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\0" | netcat localhost 6767)& done)
$ time ./test | sort | uniq -c | sort -n | tail
Shutdown in 10 seconds...
Shutdown...
1 Server::~Server()
1 void Session::AsyncReadString()virtual Session::~Session()
1 void Session::AsyncReadString()void Session::ReadHandler(const boost::system::error_code&, std::size_t)
1 void Session::ReadHandler(const boost::system::error_code&, std::size_t)void Session::AsyncReadString()
3
4523 void Session::AsyncReadString()
4524 void Session::ReadHandler(const boost::system::error_code&, std::size_t)
4526 virtual Session::~Session()
real 0m10.128s
user 0m0.430s
sys 0m0.262s
¹ 异步的全部意义在于避免阻塞可能需要 "longer" 的 IO 操作。锁定的想法是永远不要锁定 "longer" 时间量,否则它们会破坏可伸缩性