asio aync_send 内存泄漏

asio aync_send memory leak

我有下一个片段:

void TcpConnection::Send(const std::vector<uint8_t>& buffer) {
std::shared_ptr<std::vector<uint8_t>> bufferCopy = std::make_shared<std::vector<uint8_t>>(buffer);

auto socket = m_socket;

m_socket->async_send(asio::buffer(bufferCopy->data(), bufferCopy->size()), [socket, bufferCopy](const boost::system::error_code& err, size_t bytesSent)
{
    if (err)
    {
        logwarning << "clientcomms_t::sendNext encountered error: " << err.message();

        // Assume that the communications path is no longer
        // valid.
        socket->close();
        }
    });
}

此代码导致内存泄漏。如果 m_socket->async_send 调用被注释,则没有内存泄漏。我不明白为什么在调度回调后 bufferCopy 没有被释放。我做错了什么? Windows 已使用。

由于您没有显示任何相关代码,并且显示的代码不包含严格的问题,我将从代码味道中假设。

味道是你有一个 TcpConnection class 不是 enable_shared_from_this<TcpConnection> 派生的。这让我怀疑你没有提前计划,因为没有 可能 合理的方式在完成任何异步操作(如 async_send )后继续使用该实例。

这让我怀疑您有一个极其简单的问题,即您的完成处理程序从不 运行s。只有一种情况可以解释这一点,这让我假设你永远不会 run() ios_service 实例

现场情况如下:

生活在 Coliru

#include <boost/asio.hpp>
namespace asio = boost::asio;
using asio::ip::tcp;

#include <iostream>
auto& logwarning = std::clog;

struct TcpConnection {
    using Buffer = std::vector<uint8_t>;
    void Send(Buffer const &);

    TcpConnection(asio::io_service& svc) : m_socket(std::make_shared<tcp::socket>(svc)) {}
    tcp::socket& socket() const { return *m_socket; }
  private:
    std::shared_ptr<tcp::socket> m_socket;
};

void TcpConnection::Send(Buffer const &buffer) {
    auto bufferCopy = std::make_shared<Buffer>(buffer);
    auto socket     = m_socket;

    m_socket->async_send(asio::buffer(bufferCopy->data(), bufferCopy->size()),
         [socket, bufferCopy](const boost::system::error_code &err, size_t /*bytesSent*/) {
             if (err) {
                 logwarning << "clientcomms_t::sendNext encountered error: " << err.message();

                 // Assume that the communications path is no longer
                 // valid.
                 socket->close();
             }

         });
}

int main() {
    asio::io_service svc;
    tcp::acceptor a(svc, tcp::v4());
    a.bind({{}, 6767});
    a.listen();

    boost::system::error_code ec;
    do {
        TcpConnection conn(svc);
        a.accept(conn.socket(), ec);

        char const* greeting = "whale hello there!\n";
        conn.Send({greeting, greeting+strlen(greeting)});
    } while (!ec);
}

您会看到任何客户端、连接,例如使用 netcat localhost 6767 将收到问候语,之后 令人惊讶的是 连接将保持打开状态,而不是关闭。

您希望服务器端以任何方式关闭连接,因为

  • async_send
  • 发生传输错误
  • 因为在完成处理程序是 运行 之后,它被销毁并因此被捕获 shared-pointers被破坏。这不仅会释放复制的缓冲区,而且 也会 运行 将关闭连接的 socket 的析构函数。

这清楚地证实了完成处理程序永远不会 运行s。修复是 "easy",找一个地方 运行 服务:

int main() {
    asio::io_service svc;
    tcp::acceptor a(svc, tcp::v4());
    a.set_option(tcp::acceptor::reuse_address());
    a.bind({{}, 6767});
    a.listen();

    std::thread th;

    {
        asio::io_service::work keep(svc); // prevent service running out of work early
        th = std::thread([&svc] { svc.run(); });

        boost::system::error_code ec;
        for (int i = 0; i < 11 && !ec; ++i) {
            TcpConnection conn(svc);
            a.accept(conn.socket(), ec);

            char const* greeting = "whale hello there!\n";
            conn.Send({greeting, greeting+strlen(greeting)});
        }
    }

    th.join();
}

这 运行 11 个连接并退出 leak-free。

更好:

当接受循环也是异步的时,它变得更清晰,并且 TcpConnection 如上所示正确共享:

Live On Coliru

#include <boost/asio.hpp>
namespace asio = boost::asio;
using asio::ip::tcp;

#include <memory>
#include <thread>
#include <iostream>
auto& logwarning = std::clog;

struct TcpConnection : std::enable_shared_from_this<TcpConnection> {
    using Buffer = std::vector<uint8_t>;

    TcpConnection(asio::io_service& svc) : m_socket(svc) {}

    void start() {
        char const* greeting = "whale hello there!\n";
        Send({greeting, greeting+strlen(greeting)});
    }

    void Send(Buffer);

  private:
    friend struct Server;
    Buffer m_output;
    tcp::socket m_socket;
};

struct Server {
    Server(unsigned short port) {
        _acceptor.set_option(tcp::acceptor::reuse_address());
        _acceptor.bind({{}, port});
        _acceptor.listen();

        do_accept();
    }

    ~Server() {
        keep.reset();
        _svc.post([this] { _acceptor.cancel(); });
        if (th.joinable())
            th.join();
    }

  private:
    void do_accept() {
        auto conn = std::make_shared<TcpConnection>(_svc);
        _acceptor.async_accept(conn->m_socket, [this,conn](boost::system::error_code ec) {
            if (ec)
                logwarning << "accept failed: " << ec.message() << "\n";
            else {
                conn->start();
                do_accept();
            }
        });
    }

    asio::io_service _svc;
    // prevent service running out of work early:
    std::unique_ptr<asio::io_service::work> keep{std::make_unique<asio::io_service::work>(_svc)};
    std::thread th{[this]{_svc.run();}}; // TODO handle handler exceptions

    tcp::acceptor _acceptor{_svc, tcp::v4()};
};

void TcpConnection::Send(Buffer buffer) {
    m_output  = std::move(buffer);
    auto self = shared_from_this();

    m_socket.async_send(asio::buffer(m_output),
         [self](const boost::system::error_code &err, size_t /*bytesSent*/) {
             if (err) {
                 logwarning << "clientcomms_t::sendNext encountered error: " << err.message() << "\n";
                 // not holding on to `self` means the socket gets closed
             }

             // do more with `self` which points to the TcpConnection instance...
         });
}

int main() {
    Server server(6868);
    std::this_thread::sleep_for(std::chrono::seconds(3));
}