shared_from_this 抛出 bad_weak_ptr 和 boost::asio

shared_from_this throws bad_weak_ptr with boost::asio

首先,我已经阅读了列出的所有相关问题。

他们说,"you must have an existing shared_ptr to this before you can use shared_from_this."据我所知,我绝不会违反该条件。我将 Foo 的实例创建为 shared_ptr 并强制将其始终创建为 shared_ptr。然后,我将 shared_ptr 存储在一个集合中。然而,当 shared_from_this 被调用时,我仍然得到 bad_weak_ptr 异常。

#pragma once

#include <memory>
#include <vector>

//--------------------------------------------------------------------
class Foo : std::enable_shared_from_this<Foo>
{
public:

    typedef std::shared_ptr<Foo> SharedPtr;

    // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
    static Foo::SharedPtr Create()
    {
        return Foo::SharedPtr(new Foo());
    };

    Foo(const Foo &) = delete;
    Foo(Foo &&) = delete;
    Foo & operator = (const Foo &) = delete;
    Foo & operator = (Foo &&) = delete;
    ~Foo() {};

    // We have to defer the start until we are fully constructed because we share_from_this()
    void Start()
    {
        DoStuff();
    }

private:

    Foo() {}

    void DoStuff()
    {
        auto self(shared_from_this());
    }
};

//--------------------------------------------------------------------
int main()
{
    std::vector<Foo::SharedPtr> foos;
    Foo::SharedPtr foo = Foo::Create();
    foos.emplace_back(foo);
    foo->Start();

    return 0;
}

您必须根据

public 说明符继承 enable_shared_from_this

Publicly inheriting from std::enable_shared_from_this provides the type T with a member function shared_from_this.

来自 http://en.cppreference.com/w/cpp/memory/enable_shared_from_this.

所以写

class Foo : public std::enable_shared_from_this<Foo>

首先,您在发布工作之前启动线程,因此 io_service::run() 很可能在 DoAccept 实际完成之前完成。

接下来,基数 class 必须是 PUBLIC 才能使 enable_shared_from_this 工作:

class Connection : public std::enable_shared_from_this<Connection> {

工作独立代码:

#include <iostream>
#include <mutex>
namespace SomeNamespace{
struct Logger {
    enum { LOGGER_SEVERITY_INFO };
    void Log(std::string const& msg, std::string const& file, unsigned line, int level) const {
        static std::mutex mx;
        std::lock_guard<std::mutex> lk(mx);
        std::cout << file << ":" << line << " level:" << level << " " << msg << "\n";
    }
    template <typename... Args>
    void LogF(std::string const& msg, Args const&... args) const {
        static std::mutex mx;
        std::lock_guard<std::mutex> lk(mx);
        static char buf[2048];
        snprintf(buf, sizeof(buf)-1, msg.c_str(), args...);
        std::cout << buf << "\n";
    }
    static Logger &GetInstance() {
        static Logger This;
        return This;
    }
};
} // namespace Somenamespace

#include <boost/asio.hpp>

#include <atomic>
#include <condition_variable>
#include <memory>

//--------------------------------------------------------------------
class ConnectionManager;

//--------------------------------------------------------------------
class Connection : public std::enable_shared_from_this<Connection> {
  public:
    typedef std::shared_ptr<Connection> SharedPtr;

    // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
    static Connection::SharedPtr Create(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket &socket);

    Connection(const Connection &) = delete;
    Connection(Connection &&) = delete;
    Connection &operator=(const Connection &) = delete;
    Connection &operator=(Connection &&) = delete;
    ~Connection();

    // We have to defer the start until we are fully constructed because we share_from_this()
    void Start();
    void Stop();

    void Send(const std::vector<char> &data);

  private:
    ConnectionManager *m_owner;
    boost::asio::ip::tcp::socket m_socket;
    std::atomic<bool> m_stopped;
    boost::asio::streambuf m_receiveBuffer;
    mutable std::mutex m_sendMutex;
    std::shared_ptr<std::vector<boost::asio::const_buffer> > m_sendBuffers;
    bool m_sending;

    std::vector<char> m_allReadData; // for testing

    Connection(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket socket);

    void DoReceive();
    void DoSend();
};

//--------------------------------------------------------------------

//#include "Connection.h"
//#include "ConnectionManager.h"
//**ConnectionManager.h **

//#pragma once

//#include "Connection.h"

// Boost Includes
#include <boost/asio.hpp>

// Standard Includes
#include <thread>
#include <vector>

//--------------------------------------------------------------------
class ConnectionManager {
  public:
    ConnectionManager(unsigned port, size_t numThreads);
    ConnectionManager(const ConnectionManager &) = delete;
    ConnectionManager(ConnectionManager &&) = delete;
    ConnectionManager &operator=(const ConnectionManager &) = delete;
    ConnectionManager &operator=(ConnectionManager &&) = delete;
    ~ConnectionManager();

    void Start();
    void Stop();

    void OnConnectionClosed(Connection::SharedPtr connection);

  protected:
    boost::asio::io_service m_io_service;
    boost::asio::ip::tcp::acceptor m_acceptor;
    boost::asio::ip::tcp::socket m_listenSocket;
    std::vector<std::thread> m_threads;

    mutable std::mutex m_connectionsMutex;
    std::vector<Connection::SharedPtr> m_connections;

    void IoServiceThreadProc();

    void DoAccept();
};

//--------------------------------------------------------------------

#include <boost/bind.hpp>

#include <algorithm>

//--------------------------------------------------------------------
Connection::SharedPtr Connection::Create(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket &socket) {
    return Connection::SharedPtr(new Connection(connectionManager, std::move(socket)));
}

//--------------------------------------------------------------------
Connection::Connection(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket socket)
        : m_owner(connectionManager), m_socket(std::move(socket)), m_stopped(false), m_receiveBuffer(), m_sendMutex(),
          m_sendBuffers(), m_sending(false), m_allReadData() {}

//--------------------------------------------------------------------
Connection::~Connection() {
    // Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business
}

//--------------------------------------------------------------------
void Connection::Start() { DoReceive(); }

//--------------------------------------------------------------------
void Connection::Stop() {
    // The entire connection class is only kept alive, because it is a shared pointer and always has a ref count
    // as a consequence of the outstanding async receive call that gets posted every time we receive.
    // Once we stop posting another receive in the receive handler and once our owner release any references to
    // us, we will get destroyed.
    m_stopped = true;
    m_owner->OnConnectionClosed(shared_from_this());
}

//--------------------------------------------------------------------
void Connection::Send(const std::vector<char> &data) {
    std::lock_guard<std::mutex> lock(m_sendMutex);

    // If the send buffers do not exist, then create them
    if (!m_sendBuffers) {
        m_sendBuffers = std::make_shared<std::vector<boost::asio::const_buffer> >();
    }

    // Copy the data to be sent to the send buffers
    m_sendBuffers->emplace_back(boost::asio::buffer(data));

    DoSend();
}

//--------------------------------------------------------------------
void Connection::DoSend() {
    // According to the boost documentation, we cannot issue an async_write while one is already outstanding
    //
    // If that is the case, it is OK, because we've added the data to be sent to a new set of buffers back in
    // the Send method. Notice how the original buffer is moved, so therefore will be null below and how Send
    // will create new buffers and accumulate data to be sent until we complete in the lamda
    //
    // When we complete in the lamda, if we have any new data to be sent, we call DoSend once again.
    //
    // It is important though, that DoSend is only called from the lambda below and the Send method.

    if (!m_sending && m_sendBuffers) {
        m_sending = true;
        auto copy = std::move(m_sendBuffers);
        auto self(shared_from_this());

        boost::asio::async_write(m_socket, *copy,
             [self, copy](const boost::system::error_code &errorCode, size_t bytes_transferred) {
                 std::lock_guard<std::mutex> lock(self->m_sendMutex);
                 self->m_sending = false;

                 if (errorCode) {
                     // An error occurred
                     return;
                 }

                 self->DoSend();
             });
    }
}

//--------------------------------------------------------------------
void Connection::DoReceive() {
    SomeNamespace::Logger::GetInstance().Log(__PRETTY_FUNCTION__, __FILE__, __LINE__, SomeNamespace::Logger::LOGGER_SEVERITY_INFO);
    auto self(shared_from_this()); // ***EXCEPTION HERE****

    boost::asio::async_read_until(m_socket, m_receiveBuffer, '#',
      [self](const boost::system::error_code &errorCode, size_t bytesRead) {
          if (errorCode) {
              // Notify our masters that we are ready to be destroyed
              self->m_owner->OnConnectionClosed(self);

              // An error occured
              return;
          }

          // Grab the read data
          std::istream stream(&self->m_receiveBuffer);
          std::string data;
          std::getline(stream, data, '#');

          // Issue the next receive
          if (!self->m_stopped) {
              self->DoReceive();
          }
      });
}

//--------------------------------------------------------------------

//**ConnectionManager.cpp **

//#include "ConnectionManager.h"

//#include "Logger.h"

#include <boost/bind.hpp>

#include <system_error>

//------------------------------------------------------------------------------
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads)
        : m_io_service(), m_acceptor(m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
          m_listenSocket(m_io_service), m_threads(numThreads) {}

//------------------------------------------------------------------------------
ConnectionManager::~ConnectionManager() { Stop(); }

//------------------------------------------------------------------------------
void ConnectionManager::Start() {
    if (m_io_service.stopped()) {
        m_io_service.reset();
    }

    DoAccept();

    for (auto &thread : m_threads) {
        if (!thread.joinable()) {
            thread = std::thread(&ConnectionManager::IoServiceThreadProc, this);
        }
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::Stop() {
    {
        std::lock_guard<std::mutex> lock(m_connectionsMutex);
        m_connections.clear();
    }

    // TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get
    // destroyed?
    //        Because remember they have outstanding ref count to thier shared_ptr in the async handlers
    m_io_service.stop();

    for (auto &thread : m_threads) {
        if (thread.joinable()) {
            thread.join();
        }
    }
}

//------------------------------------------------------------------------------
void ConnectionManager::IoServiceThreadProc() {
    try {
        // Log that we are starting the io_service thread
        {
            const std::string msg("io_service socket thread starting.");
            SomeNamespace::Logger::GetInstance().Log(msg, __FILE__, __LINE__,
                                                      SomeNamespace::Logger::LOGGER_SEVERITY_INFO);
        }

        // Run the asynchronous callbacks from the socket on this thread
        // Until the io_service is stopped from another thread
        m_io_service.run();
    } catch (std::system_error &e) {
        SomeNamespace::Logger::GetInstance().LogF("System error caught in io_service socket thread. Error Code: %d", e.code().value());
    } catch (std::exception &e) {
        SomeNamespace::Logger::GetInstance().LogF("Standard exception caught in io_service socket thread. Exception: %s", e.what());
    } catch (...) {
        SomeNamespace::Logger::GetInstance().LogF("Unhandled exception caught in io_service socket thread.");
    }

    SomeNamespace::Logger::GetInstance().LogF("io_service socket thread exiting."); 
}

//------------------------------------------------------------------------------
void ConnectionManager::DoAccept() {
    SomeNamespace::Logger::GetInstance().Log(__PRETTY_FUNCTION__, __FILE__, __LINE__, SomeNamespace::Logger::LOGGER_SEVERITY_INFO);

    m_acceptor.async_accept(m_listenSocket, [this](const boost::system::error_code errorCode) {
        if (errorCode) {
            return;
        }

        {
            // Create the connection from the connected socket
            Connection::SharedPtr connection = Connection::Create(this, m_listenSocket);
            {
                std::lock_guard<std::mutex> lock(m_connectionsMutex);
                m_connections.push_back(connection);
                connection->Start();
            }
        }

        DoAccept();
    });
}

//------------------------------------------------------------------------------
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection) {
    std::lock_guard<std::mutex> lock(m_connectionsMutex);

    auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection);
    if (itConnection != m_connections.end()) {
        m_connections.erase(itConnection);
    }
}

//------------------------------------------------------------------------------
//**main.cpp**
//#include "ConnectionManager.h"

#include <cstring>
#include <iostream>
#include <string>

int main() {
    ConnectionManager connectionManager(4000, 2);
    connectionManager.Start();

    std::this_thread::sleep_for(std::chrono::minutes(1));

    connectionManager.Stop();
}