boost::asio::ip::tcp::acceptor 在使用 async_accept 接收到连接请求时终止应用程序
boost::asio::ip::tcp::acceptor terminates application when receiving connection request using async_accept
我想制作这个简单的服务器来侦听传入的连接请求、建立连接并发送一些数据。当我启动这个接受器时,它看起来工作正常,它等待那些传入的连接请求,但是当我的客户端尝试连接到这个接受器时,它会自动崩溃。我什至无法用 catch(...)
捕获任何异常
当我启动这个程序时,它在终端中看起来像这样
但是当我尝试连接时
客户端应用程序收到这种错误代码
我的 my_acceptor
class 有什么根本性的错误吗?
class my_acceptor{
public:
my_acceptor(asio::io_context& ios, unsigned short port_num) :
m_ios(ios),
port{port_num},
m_acceptor{ios}{}
//start accepting incoming connection requests
void Start()
{
std::cout << "Acceptor Start" << std::endl;
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port);
m_acceptor.open(endpoint.protocol());
m_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
m_acceptor.bind(endpoint);
m_acceptor.listen();
InitAccept();
}
void Stop(){}
private:
void InitAccept()
{
std::cout << "Acceptor InitAccept" << std::endl;
std::shared_ptr<asio::ip::tcp::socket> sock{new asio::ip::tcp::socket(m_ios)};
m_acceptor.async_accept(*sock.get(),
[this, sock](const boost::system::error_code& error)
{
onAccept(error, sock);
});
}
void onAccept(const boost::system::error_code& ec, std::shared_ptr<asio::ip::tcp::socket> sock)
{
std::cout << "Acceptor onAccept" << std::endl;
}
private:
unsigned short port;
asio::io_context& m_ios;
asio::ip::tcp::acceptor m_acceptor;
};
以防万一这是包装 my_acceptor
的 Server
代码
class Server{
public:
Server(){}
//start the server
void Start(unsigned short port_num, unsigned int thread_pool_size)
{
assert(thread_pool_size > 0);
//create specified number of threads and add them to the pool
for(unsigned int i = 0; i < thread_pool_size; ++i)
{
std::unique_ptr<std::thread> th(
new std::thread([this]()
{
m_ios.run();
}));
m_thread_pool.push_back(std::move(th));
}
//create and start acceptor
acc.reset(new my_acceptor(m_ios, port_num));
acc->Start();
}
//stop the server
void Stop()
{
work_guard.reset();
acc->Stop();
m_ios.stop();
for(auto& th : m_thread_pool)
{
th->join();
}
}
private:
asio::io_context m_ios;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard = boost::asio::make_work_guard(m_ios);
std::unique_ptr<my_acceptor> acc;
std::vector<std::unique_ptr<std::thread>> m_thread_pool;
};
至少有一个线程错误。 tcp::acceptor
不是线程安全的,您(可能)运行 多个线程。因此,您将需要从链中完成受体访问。
my_acceptor(asio::io_context& ios, unsigned short port_num) :
m_ios(ios),
port{port_num},
m_acceptor{make_strand(ios)}{}
然后任何涉及它的操作都必须在那条链上。例如,缺少的 Stop()
代码应如下所示:
void Stop(){
post(m_acceptor.get_executor(), [this] { m_acceptor.cancel(); });
}
我按原样保留初始接受,因为此时不涉及多个线程。
Likewise in Start() and Stop() you should check whether acc
is null, because acc->Stop()
would throw and just replacing a running acc
would cause Undefined Behaviour due to deleting the instance that is still having async operations in flight.
在旁注中,如果停止 运行ning 接受器,则不需要 m_ios.stop()
。将来您可能必须发出停止任何客户端连接的信号,以便线程自然加入。
这是我完成接受循环的方式:
void onAccept(error_code ec, std::shared_ptr<tcp::socket> sock)
{
std::cout << "Acceptor onAccept " << ec.message() << " " << sock.get() << std::endl;
if (!ec) {
InitAccept();
}
}
请注意,除非套接字被取消(或其他错误),否则我们将继续接受。
我认为线程问题可能是您的大问题。我的建议生效后的结果:
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <thread>
using namespace std::chrono_literals;
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
class my_acceptor {
public:
my_acceptor(asio::io_context& ios, unsigned short port_num) :
m_ios(ios),
port{port_num},
m_acceptor{make_strand(ios)}{}
//start accepting incoming connection requests
void Start()
{
std::cout << "Acceptor Start" << std::endl;
tcp::endpoint endpoint(tcp::v4(), port);
m_acceptor.open(endpoint.protocol());
m_acceptor.set_option(tcp::acceptor::reuse_address(true));
m_acceptor.bind(endpoint);
m_acceptor.listen();
InitAccept();
}
void Stop(){
post(m_acceptor.get_executor(), [this] { m_acceptor.cancel(); });
}
private:
void InitAccept()
{
std::cout << "Acceptor InitAccept" << std::endl;
auto sock = std::make_shared<tcp::socket>(m_ios);
m_acceptor.async_accept(*sock,
[this, sock](error_code error) { onAccept(error, sock); });
}
void onAccept(error_code ec, const std::shared_ptr<tcp::socket>& sock)
{
std::cout << "Acceptor onAccept " << ec.message() << " " << sock.get() << std::endl;
if (!ec) {
InitAccept();
}
}
private:
asio::io_context& m_ios;
unsigned short port;
tcp::acceptor m_acceptor;
};
class Server{
public:
Server() = default;
//start the server
void Start(unsigned short port_num, unsigned int thread_pool_size)
{
assert(!acc); // otherwise UB results
assert(thread_pool_size > 0);
//create specified number of threads and add them to the pool
for(unsigned int i = 0; i < thread_pool_size; ++i)
{
std::unique_ptr<std::thread> th(
new std::thread([this]() { m_ios.run(); }));
m_thread_pool.push_back(std::move(th));
}
//create and start acceptor
acc = std::make_unique<my_acceptor>(m_ios, port_num);
acc->Start();
}
//stop the server
void Stop()
{
work_guard.reset();
if (acc) {
acc->Stop();
}
//m_ios.stop();
for(auto& th : m_thread_pool) {
th->join();
}
acc.reset();
}
private:
asio::io_context m_ios;
asio::executor_work_guard<asio::io_context::executor_type>
work_guard = make_work_guard(m_ios);
std::unique_ptr<my_acceptor> acc;
std::vector<std::unique_ptr<std::thread>> m_thread_pool;
};
int main() {
Server s;
s.Start(6868, 1);
std::this_thread::sleep_for(10s);
s.Stop();
}
使用 netcat 作为客户端进行测试:
for msg in one two three; do
sleep 1
nc 127.0.0.1 6868 <<< "$msg"
done
版画
Acceptor Start
Acceptor InitAccept
Acceptor onAccept Success 0x1f26960
Acceptor InitAccept
Acceptor onAccept Success 0x7f59f80009d0
Acceptor InitAccept
Acceptor onAccept Success 0x7f59f8000a50
Acceptor InitAccept
Acceptor onAccept Operation canceled 0x7f59f80009d0
我想制作这个简单的服务器来侦听传入的连接请求、建立连接并发送一些数据。当我启动这个接受器时,它看起来工作正常,它等待那些传入的连接请求,但是当我的客户端尝试连接到这个接受器时,它会自动崩溃。我什至无法用 catch(...)
当我启动这个程序时,它在终端中看起来像这样
但是当我尝试连接时
客户端应用程序收到这种错误代码
我的 my_acceptor
class 有什么根本性的错误吗?
class my_acceptor{
public:
my_acceptor(asio::io_context& ios, unsigned short port_num) :
m_ios(ios),
port{port_num},
m_acceptor{ios}{}
//start accepting incoming connection requests
void Start()
{
std::cout << "Acceptor Start" << std::endl;
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port);
m_acceptor.open(endpoint.protocol());
m_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
m_acceptor.bind(endpoint);
m_acceptor.listen();
InitAccept();
}
void Stop(){}
private:
void InitAccept()
{
std::cout << "Acceptor InitAccept" << std::endl;
std::shared_ptr<asio::ip::tcp::socket> sock{new asio::ip::tcp::socket(m_ios)};
m_acceptor.async_accept(*sock.get(),
[this, sock](const boost::system::error_code& error)
{
onAccept(error, sock);
});
}
void onAccept(const boost::system::error_code& ec, std::shared_ptr<asio::ip::tcp::socket> sock)
{
std::cout << "Acceptor onAccept" << std::endl;
}
private:
unsigned short port;
asio::io_context& m_ios;
asio::ip::tcp::acceptor m_acceptor;
};
以防万一这是包装 my_acceptor
Server
代码
class Server{
public:
Server(){}
//start the server
void Start(unsigned short port_num, unsigned int thread_pool_size)
{
assert(thread_pool_size > 0);
//create specified number of threads and add them to the pool
for(unsigned int i = 0; i < thread_pool_size; ++i)
{
std::unique_ptr<std::thread> th(
new std::thread([this]()
{
m_ios.run();
}));
m_thread_pool.push_back(std::move(th));
}
//create and start acceptor
acc.reset(new my_acceptor(m_ios, port_num));
acc->Start();
}
//stop the server
void Stop()
{
work_guard.reset();
acc->Stop();
m_ios.stop();
for(auto& th : m_thread_pool)
{
th->join();
}
}
private:
asio::io_context m_ios;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard = boost::asio::make_work_guard(m_ios);
std::unique_ptr<my_acceptor> acc;
std::vector<std::unique_ptr<std::thread>> m_thread_pool;
};
至少有一个线程错误。 tcp::acceptor
不是线程安全的,您(可能)运行 多个线程。因此,您将需要从链中完成受体访问。
my_acceptor(asio::io_context& ios, unsigned short port_num) :
m_ios(ios),
port{port_num},
m_acceptor{make_strand(ios)}{}
然后任何涉及它的操作都必须在那条链上。例如,缺少的 Stop()
代码应如下所示:
void Stop(){
post(m_acceptor.get_executor(), [this] { m_acceptor.cancel(); });
}
我按原样保留初始接受,因为此时不涉及多个线程。
Likewise in Start() and Stop() you should check whether
acc
is null, becauseacc->Stop()
would throw and just replacing a runningacc
would cause Undefined Behaviour due to deleting the instance that is still having async operations in flight.
在旁注中,如果停止 运行ning 接受器,则不需要 m_ios.stop()
。将来您可能必须发出停止任何客户端连接的信号,以便线程自然加入。
这是我完成接受循环的方式:
void onAccept(error_code ec, std::shared_ptr<tcp::socket> sock)
{
std::cout << "Acceptor onAccept " << ec.message() << " " << sock.get() << std::endl;
if (!ec) {
InitAccept();
}
}
请注意,除非套接字被取消(或其他错误),否则我们将继续接受。
我认为线程问题可能是您的大问题。我的建议生效后的结果:
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <thread>
using namespace std::chrono_literals;
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
class my_acceptor {
public:
my_acceptor(asio::io_context& ios, unsigned short port_num) :
m_ios(ios),
port{port_num},
m_acceptor{make_strand(ios)}{}
//start accepting incoming connection requests
void Start()
{
std::cout << "Acceptor Start" << std::endl;
tcp::endpoint endpoint(tcp::v4(), port);
m_acceptor.open(endpoint.protocol());
m_acceptor.set_option(tcp::acceptor::reuse_address(true));
m_acceptor.bind(endpoint);
m_acceptor.listen();
InitAccept();
}
void Stop(){
post(m_acceptor.get_executor(), [this] { m_acceptor.cancel(); });
}
private:
void InitAccept()
{
std::cout << "Acceptor InitAccept" << std::endl;
auto sock = std::make_shared<tcp::socket>(m_ios);
m_acceptor.async_accept(*sock,
[this, sock](error_code error) { onAccept(error, sock); });
}
void onAccept(error_code ec, const std::shared_ptr<tcp::socket>& sock)
{
std::cout << "Acceptor onAccept " << ec.message() << " " << sock.get() << std::endl;
if (!ec) {
InitAccept();
}
}
private:
asio::io_context& m_ios;
unsigned short port;
tcp::acceptor m_acceptor;
};
class Server{
public:
Server() = default;
//start the server
void Start(unsigned short port_num, unsigned int thread_pool_size)
{
assert(!acc); // otherwise UB results
assert(thread_pool_size > 0);
//create specified number of threads and add them to the pool
for(unsigned int i = 0; i < thread_pool_size; ++i)
{
std::unique_ptr<std::thread> th(
new std::thread([this]() { m_ios.run(); }));
m_thread_pool.push_back(std::move(th));
}
//create and start acceptor
acc = std::make_unique<my_acceptor>(m_ios, port_num);
acc->Start();
}
//stop the server
void Stop()
{
work_guard.reset();
if (acc) {
acc->Stop();
}
//m_ios.stop();
for(auto& th : m_thread_pool) {
th->join();
}
acc.reset();
}
private:
asio::io_context m_ios;
asio::executor_work_guard<asio::io_context::executor_type>
work_guard = make_work_guard(m_ios);
std::unique_ptr<my_acceptor> acc;
std::vector<std::unique_ptr<std::thread>> m_thread_pool;
};
int main() {
Server s;
s.Start(6868, 1);
std::this_thread::sleep_for(10s);
s.Stop();
}
使用 netcat 作为客户端进行测试:
for msg in one two three; do
sleep 1
nc 127.0.0.1 6868 <<< "$msg"
done
版画
Acceptor Start
Acceptor InitAccept
Acceptor onAccept Success 0x1f26960
Acceptor InitAccept
Acceptor onAccept Success 0x7f59f80009d0
Acceptor InitAccept
Acceptor onAccept Success 0x7f59f8000a50
Acceptor InitAccept
Acceptor onAccept Operation canceled 0x7f59f80009d0