从 asio 中的外部连接处理程序调用 async_write

Call async_write from outside connect handler in asio

我使用 asio 库建立 TCP 连接。异步 read/write 操作由 async_accept.

的处理函数完成
        mAcceptor.async_accept(*(mConnection->Socket()),
                boost::bind(&TCPConnection::HandleAcceptForWrite, this, 
                pI8Msg, 
                boost::asio::placeholders::error));
void TCPConnection::HandleAcceptForWrite(
        INT8* pI8Msg, 
        const boost::system::error_code& err)
{
    if (!err) {
        TransmitData(pI8Msg);//--> Want to call this fn from outside the handler
    }
    SocketAcceptConnection(pI8Msg);
}

我想避免从处理程序中调用 TransmitData (async_write)。

我打算从接受处理程序之外的任何地方调用写入。当我这样做时,我得到了错误 - 'Bad file descriptor'

是否总是需要从处理程序中进行异步写入?如果可以从其他地方调用,请分享任何代码示例。

您应该提供必要的上下文。例如。 mConnection 是什么?为什么 mConnection->Socket() return 是指针?如果这是一个服务器,为什么它只有一个 mConnection?

用 Crystal 球进行洞穴探险

用我的 crystal 球,我将用

来回答这个问题
  • mConnection 是指向封装套接字和某些连接状态的对象的共享指针
  • 它在接受之前用一个新的实例初始化,总是
  • 因此,除非其他东西共享 *mConnection 的所有权,否则它将在 mConnection 被分配一个新实例时根据定义被销毁。

综上所述,只有一个合理的解释:mConnection指向一个从enable_shared_from_this<T>派生的类型T,这样它就可以与自己共享所有权。您应该能够在 TransmitData 函数中看到这一点,其中应该在绑定表达式(或 lambda)中捕获共享指针以完成 async_read 完成处理程序。

这样做的目的是:保持连接活动,用 C++ 术语来说:延长或保证完成处理程序 return 之前的生命周期。完成处理程序可能会启动更多共享所有权的工作(捕获共享指针),依此类推,直到拥有连接对象的最后一个操作失去兴趣并且连接对象(T)被释放。


要做什么?

您需要保持连接有效,即使它处于空闲状态。有很多方法。您可以将指向它的共享指针插入“连接 table”(例如 std::vector<shared_ptr<Connection> >)。缺点是它变得很难 p运行e 连接:连接总是被拥有,所以永远不会被释放。 (见下文 weak_ptr!)

在实践中,我会让连接(让我们从现在开始称类型为 Connection 而不是 T)负责:它可能决定对方何时挂断,或者何时有一个exchange 告诉连接关闭,或者甚至超时。

因为后者很常见(连接经常在一段空闲时间后自动关闭),也最灵活(你可以将到期时间设置为“无限”),让我们展示一下:

Side Note: during my effort to expand your snippet into something working, I noticed that weirdly the acceptor appears to live INSIDE the connection type? That makes no sense because how can you have multiple connections with a single acceptor ("server")? So I changed things around to be more typical

请注意,我仍然保留 table 个连接 (mConnections),但我没有存储 owning 指针(如 shared_ptr<>)存储 weak_ptr<> 以便我们可以 观察 连接而不改变它们的生命周期。

Live Demo

我对平时的回答采取了不同的方法,并在评论中指出了很多细节:

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <memory>
#include <iostream>
#include <list>

// Library facilities
namespace ba = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;

// Application types
using INT8     = std::int8_t;
using Executor = ba::io_context::executor_type;
using Duration = std::chrono::steady_clock::duration;

struct TCPConnection : public /*!*/ std::enable_shared_from_this<TCPConnection> {

    TCPConnection(Executor ex, Duration timeout = 120s)
        : mStrand(ex)
        , mSocket(mStrand)
        , mIdleTime(timeout)
        , mIdleTimer{mStrand}
    {
        // cannot use `shared_from_this` inside the constructor!
    }

    tcp::socket& Socket() { return mSocket; }

    ~TCPConnection() {
        // Demo output, to understand TCPConnection shared lifetime
        std::cerr << __PRETTY_FUNCTION__ << std::endl;
    }

    void StartSession() {
        // Here we start the idle timer, so we don't disappear until that
        // expires;

        // We don't post to the strand because we assume this is the very first
        // operation on this object (there can only be 1 thread aware of this
        // object)
        StartTimer();
    }

    void TransmitData(INT8 const* pI8Msg)
    {
        post(mStrand, [this, pI8Msg, self = shared_from_this()] {
            std::cout << "TransmitData on " << mSocket.remote_endpoint()
                      << std::endl;
            // Remove the following  line to make the timout also fire if the
            // writing takes too long
            mIdleTimer.cancel(); // We're busy, not idle

            // WARNING: assumes
            // - lifetime of pI8Msg guaranteed
            // - message NUL-terminated
            boost::asio::async_write(
                mSocket, ba::buffer(std::basic_string_view(pI8Msg)),
                boost::bind(&TCPConnection::OnWritten, shared_from_this(),
                            ba::placeholders::error(),
                            ba::placeholders::bytes_transferred()));
        });
    }

  private:
    void StartTimer() {
        mIdleTimer.expires_from_now(mIdleTime);
        mIdleTimer.async_wait(boost::bind(&TCPConnection::OnTimer,
                                          shared_from_this(),
                                          ba::placeholders::error()));
    }

    void OnTimer(error_code ec)
    {
        if (ec != ba::error::operation_aborted) // cancellation is not timeout
        {
            std::cout << "TCPConnection timeout, closing " << mSocket.remote_endpoint() << std::endl;
            // Timeout, let's respond by closing the socket and letting the
            // connection disappear

            // No need to post to the strand since the completion already
            // happens on the strand
            mSocket.cancel();
            mSocket.shutdown(tcp::socket::shutdown_both);

            // No more async operations so shared_from_this goes out of scope
        }
    }

    void OnWritten(error_code ec, size_t bytes_transferred)
    {
        if (ec.failed()) {
            std::cerr << "OnWritten: " << ec.message() << std::endl;
            // Error, we let the connection die

            // In case the timer wasn't canceled pre-write (see comment in
            // TransmitData) let's make sure it's canceled here
            mIdleTimer.cancel();
            // ignoring errors:
            mSocket.shutdown(tcp::socket::shutdown_both, ec);
            mSocket.close(ec);
        }

        // write was successful, let's restart timer
        std::cerr << "OnWritten: " << ec.message() << " (" << bytes_transferred
                  << " bytes)" << std::endl;

        StartTimer();
    }

    ba::strand<Executor> mStrand; // serialize execution
    tcp::socket          mSocket;
    Duration             mIdleTime;
    ba::steady_timer     mIdleTimer{mStrand, mIdleTime};
};

struct Server {
    Server(Executor ex, uint16_t port)
        : mExecutor(ex)
        , mAcceptor{make_strand(mExecutor), {{}, port}}
    {
        AcceptLoop();
    }

    void TransmitData(INT8 const* pI8Msg) {
        for (auto& weak : mConnections) {
            if (auto conn = weak.lock()) {
                conn->TransmitData(pI8Msg);
            }
        }
    }

  private:
    std::shared_ptr<TCPConnection> mConnection;
    std::list<std::weak_ptr<TCPConnection> > mConnections;

    void AcceptLoop()
    {
        mConnection = std::make_shared<TCPConnection>(mExecutor, 1s);

        mAcceptor.async_accept(mConnection->Socket(),
                               boost::bind(&Server::HandleAccept, this,
                                           boost::asio::placeholders::error()));
    }

    void HandleAccept(error_code err)
    {
        if (!err) {
            std::cerr << "HandleAccept: "
                      << mConnection->Socket().remote_endpoint() << std::endl;
            mConnection->StartSession(); // no tramsmit, just a keeping
                                         // the connection open

            mConnections.push_back(
                mConnection); // store so we can send messages later

            // optional: garbage collect the mConnections table
            mConnections.remove_if(
                std::mem_fn(&std::weak_ptr<TCPConnection>::expired));

            // Keep accepting connections
            AcceptLoop();
        }
    }

    Executor      mExecutor;
    tcp::acceptor mAcceptor;
};

int main() {
    ba::io_context mIo;

    Server srv(mIo.get_executor(), 7979);

    std::thread delayed_transmission([&srv] {
        static INT8 const message[] =
            "HELLO WORLD\n"; // NUL-terminated and static!
        std::this_thread::sleep_for(3s);
        std::cout << "Sending '" << message << "'" << std::endl;
        srv.TransmitData(message);
    });

    // mIo.run();
    mIo.run_for(10s); // timelimited for Coliru

    delayed_transmission.join();
}

这是实时 运行 的演示,展示了超时的连接以及成功的 TransmitData 调用:

  • 我设置超时为1s,
  • 服务器启动3秒后传输完成(delayed_transmission)
  • 我提前开始一组 3 个连接,超时,
  • 我马上开始另外一组3个连接,正好及时收到延迟传输
  • 总服务器 运行 时间限制为 10 秒用于现场演示

文本输出

便于参考(摘自http://coliru.stacked-crooked.com/a/def4f1927c7b975f):

  • 服务器

     ./a.out &
     HandleAccept: 127.0.0.1:43672
     HandleAccept: 127.0.0.1:43674
     HandleAccept: 127.0.0.1:43676
     TCPConnection timeout, closing 127.0.0.1:43672
     TCPConnection::~TCPConnection()
     TCPConnection timeout, closing 127.0.0.1:43674
     TCPConnection::~TCPConnection()
     TCPConnection timeout, closing 127.0.0.1:43676
     TCPConnection::~TCPConnection()
     HandleAccept: 127.0.0.1:43678
     HandleAccept: 127.0.0.1:43680
     HandleAccept: 127.0.0.1:43682
     Sending 'HELLO WORLD
     '
     TransmitData on 127.0.0.1:43678
     TransmitData on 127.0.0.1:43680
     TransmitData on 127.0.0.1:43682
     OnWritten: SuccessHELLO WORLD
      (12 bytes)
     OnWritten: Success (12 bytes)
     OnWritten: Success (12 bytes)
     TCPConnection timeout, closing 127.0.0.1:43678
     TCPConnection::~TCPConnection()
     TCPConnection timeout, closing 127.0.0.1:43680
     TCPConnection::~TCPConnection()
     TCPConnection timeout, closing 127.0.0.1:43682
     TCPConnection::~TCPConnection()
    
  • 客户

     sleep 1
     (for a in {1..3}; do netcat 127.0.0.1 7979& done; time wait)
    
     real    0m1.011s
     user    0m0.012s
     sys 0m0.004s
    
     sleep 0.5
     (for a in {1..3}; do netcat 127.0.0.1 7979& done; time wait)
     HELLO WORLD
     HELLO WORLD
     HELLO WORLD
    
     real    0m1.478s
     user    0m0.008s
     sys 0m0.008s