如何消除在飞行中摧毁 boost::asio 个实体时的崩溃?

How to eliminate crashes when destroying boost::asio entities on fly?

注意!!!问题是针对 boost::asio library 方面的专家的。不幸的是,我无法将代码做得更紧凑,它只包含描述问题的最少数量。该代码是示例,是人工创建的。它崩溃的地方已知并在评论中描述,它旨在说明崩溃! NO need 对调试代码有任何帮助...

问题是关于如何设计 asio 服务器,而不是关于它崩溃的地方!!!

此示例接近官方 boost::asio 文档中的“聊天服务器”设计。但是,与官方示例不同,只有连接 class 的对象是动态的 created/destroyed,在我的示例中,服务器及其连接 class 实体都是动态的 created/destroyed。 .. 我相信这种模式的实现应该在 asio 爱好者中广为人知,下面描述的问题应该已经有人解决了...

请看代码。 在这里,CAsioServer 和 CAsioConnection 的实体是动态创建和销毁的。

#include <map>
#include <array>
#include <set>
#include <vector>
#include <deque>
#include <thread>
#include <iostream>
#include <asio.hpp>
#include <iomanip>


class CAsioConnection
    : public std::enable_shared_from_this<CAsioConnection>
{
public:
    using PtrType = std::shared_ptr<CAsioConnection>;

    CAsioConnection(asio::ip::tcp::socket socket, std::set<CAsioConnection::PtrType>& connections)
        : socket_(std::move(socket)), connections_(connections)
    {
        std::cout << "-- CAsioConnection is creating, socket: " << socket_.native_handle() << "\n";
    }

    virtual ~CAsioConnection()
    {
        std::cout << "-- CAsioConnection is destroying , socket: " << socket_.native_handle() << "\n";
    }

    void read() { do_read(); }

private:
    void do_read(void)
    {
        uint8_t buff[3];

        asio::async_read(socket_, asio::buffer(buff,3),
            [this](std::error_code ec, std::size_t  /*length*/) {
            if (!ec)
            {
                do_read();
            }
            else
            {
                std::cout << "-- CAsioConnection::do_read() error : " << ec.message() << "\n";
                // Here is the crash N2
                connections_.erase(shared_from_this());
                // Crash may be fixed by the code below
                //if (ec.value() != 1236) // (winerror.h) #define ERROR_CONNECTION_ABORTED 1236L
                //  connections_.erase(shared_from_this());
            }
        });
    }

    asio::ip::tcp::socket socket_;
    std::set<CAsioConnection::PtrType>& connections_;
};

class CAsioServer
    : public std::enable_shared_from_this<CAsioServer>
{
public:
    using PtrType = std::shared_ptr<CAsioServer>;

    CAsioServer(int port, asio::io_context& io, const asio::ip::tcp::endpoint& endpoint)
        : port_(port), acceptor_(io, endpoint)
    {
        std::cout << "-- CAsioServer is creating, port: " << port_ << "\n";
    }

    virtual ~CAsioServer()
    {
        std::cout << "-- CAsioServer is destroying , port: " << port_ << "\n";
    }

    int port(void) { return port_; }

    void accept(void) { do_accept(); }
private:
    void do_accept()
    {
        acceptor_.async_accept([this](std::error_code ec, asio::ip::tcp::socket socket) {
            if (!ec)
            {
                std::cout << "-- CAsioServer::do_accept() connection to socket: " << socket.native_handle() << "\n";
                auto c = std::make_shared<CAsioConnection>(std::move(socket), connections_);
                connections_.insert(c);
                c->read();
            }
            else
            {
                // Here is the crash N1
                std::cout << "-- CAsioServer::do_accept() error : " << ec.message() << "\n";
                // Crash may be fixed by the code below
                //if (ec.value() == 995) // (winerror.h) #define ERROR_OPERATION_ABORTED 995L
                //  return;
            }
            // Actually here is the crash N1 )), but the fix is above...
            do_accept();
        });
    }

    int port_;
    asio::ip::tcp::acceptor acceptor_;
    std::set<CAsioConnection::PtrType> connections_;
};

//*****************************************************************************

class CTcpBase
{
public:
    CTcpBase()
    {
        // heart beat timer to keep it alive
        do_heart_beat();
        t_ = std::thread([this] {
            std::cout << "-- io context is RUNNING!!!\n";
            io_.run();
            std::cout << "-- io context has been STOPED!!!\n";
        });
    }

    virtual ~CTcpBase()
    {
        io_.stop();

        if (t_.joinable())
            t_.join();
    }

    void add_server(int port)
    {
        io_.post([this, port] 
        {
            for (auto s : servers_)
                if (port == s->port())
                    return;

            auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port);
            auto s = std::make_shared<CAsioServer>(port, io_, endpoint);
            s->accept();
            servers_.insert(s);
        });
    }

    void remove_server(int port)
    {
        io_.post([this, port] 
        {
            for (auto s : servers_)
                if (port == s->port())
                    { servers_.erase(s); return; }
        });
    }

private:

    void do_heart_beat(void)
    {
        std::cout << "-- beat\n";
        auto timer = std::make_shared<asio::steady_timer>(io_, asio::chrono::milliseconds(3000));
        timer->async_wait([timer, this](const asio::error_code& ec) {
            do_heart_beat();
        });
    }

    asio::io_context io_;
    std::thread t_;
    std::set<CAsioServer::PtrType> servers_;
};

//*****************************************************************************

int main(void)
{
    CTcpBase tcp_base;

    std::cout << "CONNECT the server to port 502\n";
    tcp_base.add_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(20));
    
    std::cout << "REMOVE the server from port 502\n";
    tcp_base.remove_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(10));

    return 0;
}

假设CTcpBase::add_server()CTcpBase::remove_server()会被不同线程的外部客户端调用。 asio 上下文在它自己的线程中处理它。 让我们考虑两种情况:

  1. 开始申请,稍等半分钟。 崩溃发生在 CAsioServer::do_accept() 见下面的输出。 Debug Console Output
  2. 开始申请。通过任何外部客户端连接到端口 502 并等待少于 20 秒。 崩溃发生在 CAsioConnection::do_read() 见下面的输出。 Debug Console Output

似乎 asio 框架调用推迟了 asio::async_read()acceptor_.async_accept() 处理程序,因为它的 class' 实体已经被销毁。

我已经通过错误检查修复了处理程序,但解决方案似乎并不可靠。谁知道还有什么其他的错误和场景……有时,当客户端断开连接时,我需要清理 asio::async_read() 设置的 connection_,我如何确定服务器或连接对象仍然存在?...

有什么方法可以询问 boost::asio 框架来防止为已经销毁的对象调用推迟的处理程序?或者如何通过错误码识别(be 100% sure)对象已经被销毁?或者我在 asio 的范围内还有其他解决方案或设计模式 - 如何在一个 运行 线程中动态处理 created/destroyed 服务器及其连接而没有互斥锁和其他东西...

首先检查您的 io_service 是否严格 single-threaded 运行。这从代码中是看不到的。如果不是,则共享状态(如 connections_)需要同步访问。

In fact you can have a logical strand in the form of the accept loop, but to take advantage of this you should make all accesses to connections_ happen there, see e.g.

  • here where we have the session list hold the sessions directly, no shared-pointer necessary at all:
  • or here where we do have shared pointers and we store weak pointers in the sessions list, that can be "garbage collected" from inside the accept loop:

更新

  • buff 是一个局部变量,这会导致未定义的行为,因为它在 async_read 操作的整个时间内都无效。

  • 一般来说,shared_from_this 惯用语 也保留一个共享指针的容器是没有意义的 已经决定了生命周期。

    您的问题似乎是有时 CAsioServer 被简单地销毁,这意味着 connections_ 的所有元素都被释放,那时它们的 CAsioConnection 对象可能被销毁。它还会破坏 CAsioServer.

    每当 Asio 对象被析构时,任何挂起的异步操作都将失败并显示 asio::error:operation_aborted,这确实意味着您已经响应。但是,当调用完成处理程序时,对象已经失效。

    中,我刚刚注意到缺少一个关键成分:你永远不会 capture/bind 指向 CAsioConnection 的共享指针完成处理程序.

    这是高度 un-idiomatic。

    相反,您使用共享指针来管理生命周期。如果您还需要一个连接列表,那么将其设为一个弱指针列表,这样它只会 观察 生命周期。

变化点:

  • 不需要做服务器enable_shared_from_this

  • connections_ 应该包含弱指针甚至 non-owning 指针。弱指针在这里显然要安全得多。事实上,您可以选择删除该容器,因为似乎没有任何东西在使用它。在下面的示例中,我选择保留它,以便您可以看到它的实际效果。

  • 在完成处理程序中捕获shared_from_this以确保对象在触发时仍然有效:

     asio::async_read(socket_, asio::buffer(buff,3),
         [this, self=shared_from_this()](error_code ec, std::size_t  /*length*/) {
    

简化

注意 我选择了 std::list 因为它消除了对 equality/ordering 的需要(见 std::owner_less<>)在 CAsioConnection class 中存储对容器的引用 - 使其循环依赖(CAsioConnection 类型在实例化 owner_less<> class 之前尚未完成)。我只是选择退出(不需要的?)复杂性。

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <list>
#include <memory>

namespace asio = boost::asio;
using error_code = boost::system::error_code; // compat

class CAsioConnection : public std::enable_shared_from_this<CAsioConnection> {
  public:
    using PtrType = std::shared_ptr<CAsioConnection>;

    CAsioConnection(asio::ip::tcp::socket socket) : socket_(std::move(socket)) {
        log(__FUNCTION__);
    }

    ~CAsioConnection() { log(__FUNCTION__); }

    void read() { do_read(); }

  private:
    void log(std::string_view msg) const {
        error_code ec;
        std::clog << msg << ", socket: " << socket_.remote_endpoint(ec) << "\n";
    }

    uint8_t buff[256];
    void do_read() {
        asio::async_read(socket_, asio::buffer(buff),
             [this, self = shared_from_this()](error_code ec, std::size_t length) {
                 if (!ec) {
                     log(__FUNCTION__ + (" length: " + std::to_string(length)));
                     do_read();
                 } else {
                     log(__FUNCTION__ + (" error: " + ec.message()));
                 }
             });
    }

    asio::ip::tcp::socket socket_;
};

class CAsioServer {
  public:
    CAsioServer(asio::io_context& io, const asio::ip::tcp::endpoint& endpoint)
            : acceptor_(io, endpoint) { log(__FUNCTION__); }

    ~CAsioServer() { log(__FUNCTION__); }
    int port() const { return acceptor_.local_endpoint().port(); }
    void accept() { do_accept(); }

  private:
    void do_accept() {
        acceptor_.async_accept([this](error_code ec,
                                      asio::ip::tcp::socket socket) {
            if (!ec) {
                auto c = std::make_shared<CAsioConnection>(std::move(socket));
                connections_.push_back(c);
                c->read();
            } else {
                log(__FUNCTION__ + (" error: " + ec.message()));
            }

            connections_.remove_if(std::mem_fn(&WeakPtr::expired));

            if (acceptor_.is_open())
                do_accept();
        });
    }

    void log(std::string_view msg) const {
        std::clog << msg << ", port: " << port() << "\n";
    }

    asio::ip::tcp::acceptor acceptor_;
    using WeakPtr = std::weak_ptr<CAsioConnection>;
    std::list<WeakPtr> connections_;
};

int main() {
    boost::asio::io_context io;

    CAsioServer server(io, { {}, 7878 });
    server.accept();

    io.run_for(std::chrono::seconds(10));
}

输出:

./a.out& sleep 1; nc -w 1 127.0.0.1 7878 < main.cpp
CAsioServer, port: 7878
CAsioConnection, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() error: End of file, socket: 127.0.0.1:50628
~CAsioConnection, socket: 127.0.0.1:50628
~CAsioServer, port: 7878

初级,亲爱的华生

The key of problem – I am very trusting person

我应该提到我使用 non-boost Asio 版本。 1.18.0,VS2017 和 Win10。因此,下面的所有解释都与 Asio 的 windows' 部分有关。 posix 的实现可能会有些不同。

最初实施的主要想法是:- 能够通过 adding/removing 来自适当 set<> 集合的对象来控制 server/connection 对象的数量。

下面的文字描述了为什么它在没有额外努力的情况下无法工作。

根据 Asio 文档:

~basic_stream_socket(); This function destroys the socket, cancelling any outstanding asynchronous operations associated with the socket as if by calling cancel.

我的错误是认为异步操作的取消将在析构函数范围内执行 同时调用异步处理程序

这很有趣,我想他们为什么要在异步处理程序中使用 self 指针,如果异步处理程序应该在对象的销毁阶段被拒绝。 正确答案——异步处理程序不会被拒绝))。

实际上,异步处理程序将在之后调用,class 实体到那时已经被销毁。

发生了什么:

  1. 当销毁服务器或连接时 class: 在 ~basic_stream_socket().
  2. 中为套接字句柄调用 WinSock2 ::closesocket()
  3. iocontext.run() 的下一次迭代中:win_iocp_io_context::do_one() 调用 ::GetQueuedCompletionStatus() 以获取异步操作结果并启动与已销毁套接字关联的异步处理程序。

有两个我们感兴趣的场景:

  1. Socket waits data.
  2. Socket is destroying (e.g. inside the connection class destructor).
  3. Async handler with error is called.

在这种情况下,即使 class 已经被销毁,我们也可以检查错误代码并关闭异步处理程序。我在我的问题的代码中演示了糟糕但有效的解决方案。

  1. Socket gets some data. Async handler was not started yet.
  2. Socket is destroying (e.g. inside the connection class destructor).
  3. Async handler is started WITHOUT ERRORS!!! Disaster.

在这种情况下,错误代码无法挽救我们。崩溃发生了。 因此,在异步处理程序中检查错误代码的方法不起作用。

下面的代码通过为服务器和连接 classes 引入 hasta_la_vista() 方法解决了所有问题。不是超级优雅但钢筋混凝土的解决方案:

#include <map>
#include <array>
#include <set>
#include <vector>
#include <deque>
#include <thread>
#include <iostream>
#include <asio.hpp>
#include <iomanip>


class CAsioConnection
    : public std::enable_shared_from_this<CAsioConnection>
{
public:
    using PtrType = std::shared_ptr<CAsioConnection>;

    CAsioConnection(asio::ip::tcp::socket socket, std::set<CAsioConnection::PtrType>& connections)
        : socket_(std::move(socket)), connections_(connections), destroying_in_progress(false)
    {
        std::cout << "-- CAsioConnection is creating\n";
    }

    virtual ~CAsioConnection()
    {
        std::cout << "-- CAsioConnection is destroying\n";
    }

    void read() { do_read(); }

    void hasta_la_vista(void)
    {
        destroying_in_progress = true;
        std::error_code ec;
        socket_.cancel(ec);
    }

private:
    void do_read(void)
    {
        auto self(shared_from_this());
        asio::async_read(socket_, asio::buffer(buff),
            [this, self](std::error_code ec, std::size_t  /*length*/) {

            if (destroying_in_progress)
                return;

            if (!ec)
            {
                do_read();
            }
            else
            {
                std::cout << "-- CAsioConnection::do_read() error : (" << ec.value() << ") " << ec.message() << "\n";
                hasta_la_vista();
                connections_.erase(shared_from_this());
            }
        });
    }

    uint8_t buff[3];
    asio::ip::tcp::socket socket_;
    bool destroying_in_progress;
    std::set<CAsioConnection::PtrType>& connections_;
};

//*****************************************************************************

class CAsioServer
    : public std::enable_shared_from_this<CAsioServer>
{
public:
    using PtrType = std::shared_ptr<CAsioServer>;

    CAsioServer(int port, asio::io_context& io, const asio::ip::tcp::endpoint& endpoint)
        : port_(port), destroying_in_progress(false), acceptor_(io, endpoint)
    {
        std::cout << "-- CAsioServer is creating, port: " << port_ << "\n";
    }

    virtual ~CAsioServer()
    {
        for (auto c : connections_)
        {
            c->hasta_la_vista();
        }

        std::cout << "-- CAsioServer is destroying , port: " << port_ << "\n";
    }

    int port(void) { return port_; }

    void accept(void) { do_accept(); }
    void hasta_la_vista(void) 
    { 
        destroying_in_progress = true;
        std::error_code ec;
        acceptor_.cancel(ec);
    }
private:
    void do_accept()
    {
        auto self(shared_from_this());
        acceptor_.async_accept([this, self](std::error_code ec, asio::ip::tcp::socket socket) {

            if (destroying_in_progress)
                return;

            if (!ec)
            {
                std::cout << "-- CAsioServer::do_accept() connection to socket: " << socket.native_handle() << "\n";
                auto c = std::make_shared<CAsioConnection>(std::move(socket), connections_);
                connections_.insert(c);
                c->read();
            }
            else
            {
                std::cout << "-- CAsioServer::do_accept() error : (" << ec.value() << ") "<<  ec.message() << "\n";
            }
            do_accept();
        });
    }

    int port_;
    bool destroying_in_progress;
    asio::ip::tcp::acceptor acceptor_;
    std::set<CAsioConnection::PtrType> connections_;
};

//*****************************************************************************

class CTcpBase
{
public:
    CTcpBase()
    {
        // heart beat timer to keep it alive
        do_heart_beat();
        t_ = std::thread([this] {
            std::cout << "-- io context is RUNNING!!!\n";
            io_.run();
            std::cout << "-- io context has been STOPED!!!\n";
        });
    }

    virtual ~CTcpBase()
    {
        io_.stop();

        if (t_.joinable())
            t_.join();
    }

    void add_server(int port)
    {
        io_.post([this, port] {
            for (auto& s : servers_)
                if (port == s->port())
                    return;

            auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port);
            auto s = std::make_shared<CAsioServer>(port, io_, endpoint);
            s->accept();
            servers_.insert(s);
        });
    }

    void remove_server(int port)
    {
        io_.post([this, port] {
            for (auto s : servers_)
                if (port == s->port())
                { 
                    s->hasta_la_vista();
                    servers_.erase(s); 
                    return; 
                }
        });
    }

private:

    void do_heart_beat(void)
    {
        std::cout << "-- beat\n";
        auto timer = std::make_shared<asio::steady_timer>(io_, asio::chrono::milliseconds(3000));
        timer->async_wait([timer, this](const std::error_code& ec) {
            do_heart_beat();
        });
    }

    asio::io_context io_;
    std::thread t_;
    std::set<CAsioServer::PtrType> servers_;
};

//*****************************************************************************

int main(void)
{
    CTcpBase tcp_base;

    std::cout << "CONNECT the server to port 502\n";
    tcp_base.add_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(20));

    std::cout << "REMOVE the server from port 502\n";
    tcp_base.remove_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(10));
    
    return 0;
}