如何使用超时创建 boost::async_read 和 async_write

how to create boost::async_read and async_write with timeout

我正在尝试用 boost::asio 编写服务器,但我希望 boost::asio::async_read 操作在没有数据到来时超时,但我知道如何去做。 这是我目前的代码

void do_read_header() {
    auto self(shared_from_this());
    std::cout << "do_read_header\n";
    boost::asio::async_read(
        socket_, boost::asio::buffer(res.data(), res.header_length),
        [this, self](boost::system::error_code ec,
                     std::size_t length) {
            if (!ec && res.decode_header()) {
                do_read_body();
            }
        });
    do_write();
}

void do_read_body() {
    auto self(shared_from_this());
    Message msg;
    std::cout << "do_read_body\n";
    boost::asio::async_read(
        socket_, boost::asio::buffer(res.body(), res.body_length()),
        [this, self](boost::system::error_code ec,
                     std::size_t length) {
            if (!length) {
                return;
            }
            if (!ec) {
                try {
                    std::cout << "read " << res.body() << "\n";
                    request_queue_.send(res.body(), res.body_length(),
                                        0);
                } catch (const std::exception& ex) {
                    std::cout << ex.what() << "\n";
                }
            } else {
                if (ec) {
                    std::cerr << "read error:" << ec.value()
                              << " message: " << ec.message() << "\n";
                }
                socket_.close();
            }
            do_read_header();
        });
}

void start() {
    post(strand_, [this, self = shared_from_this()] {
        do_read_header();
        do_write();
    });
}

class Server {
  public:
    Server(boost::asio::io_service& io_service, short port)
        : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
          socket_(io_service) {
        do_accept();
    }

  private:
    void do_accept() {
        acceptor_.async_accept(
            socket_, [this](boost::system::error_code ec) {
                if (!ec) {
                    std::cout << "accept connection\n";

                    std::make_shared<Session>(std::move(socket_))
                        ->start();
                }

                do_accept();
            });
    }

    tcp::acceptor acceptor_;
    tcp::socket socket_;
};

我在实现串行端口包装器时不得不解决类似的问题,这是代码的简化版本:

// Possible outcome of a read. Set by callback
enum class ReadResult
{
    ResultInProgress,
    ResultSuccess,
    ResultError,
    ResultTimeout
};

std::streamsize read(char* s, std::streamsize n, boost::posix_time::time_duration timeout)
{

    boost::asio::io_service io;
    boost::asio::serial_port port(io);
    // result is atomic to avoid race condition
    std::atomic<ReadResult> result(ReadResult::ResultInProgress); 
    std::streamsize bytesTransferred = 0;

    // Create async timer that fires after duration 
    boost::asio::deadline_timer timer(io);
    timer.expires_from_now(timeout);
    timer.async_wait(boost::bind([&result](const boost::system::error_code& error){
    // If there wasn't any error and reading is still in progress, set result as timeout        
        if (!error && result == ReadResult::ResultInProgress) 
            result = ReadResult::ResultTimeout;
    },boost::asio::placeholders::error));

    // Read asynchronously
    port.async_read_some(boost::asio::buffer(s, n), boost::bind([&result,&bytesTransferred](const boost::system::error_code& error,
    const size_t transferred){
        // If there wasn't any error on read finish set result as sucess else as error
        if (!error){
             result = ReadResult::ResultSuccess;
             bytesTransferred = transferred;
             return;
        }
        result = ReadResult::ResultError;
    },boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

    // Run loop until timeout, error or success occurs
    for (;;)
    {
        io.run_one();
        switch (result)
        {
        case ReadResult::ResultSuccess:
            // Success, cancel timer and return amount of bytes read
            timer.cancel();
            return bytesTransferred;
        case ReadResult::ResultTimeout:
            // Timeout occured, cancel read and throw exception
            port.cancel();
            throw(TimeoutException("Timeout expired"));
        case ReadResult::ResultError:
            // Error occured, cancel read and timer and throw exception
            port.cancel();
            timer.cancel();
            throw(std::ios_base::failure("Error while reading"));
        default:
            //if result is still in progress remain in the loop
            break;
        }
    }
}

所以基本上,您要做的是:

  1. 用io_service
  2. 初始化定时器
  3. 在定时器上调用 async_wait,带有设置结果超时标志的回调函数
  4. 使用设置结果成功标志的回调在您的套接字上调用 async_read
  5. 循环直到结果不再是“进行中”,注意io.run_one()循环很重要
  6. 处理结果结果

您可以将它用于任何异步函数

您可以添加一个取消 IO 操作的截止时间计时器。您可以观察取消,因为将使用 error::operation_aborted.

调用完成
deadline_.expires_from_now(1s);
deadline_.async_wait([self, this] (error_code ec) {
    if (!ec) socket_.cancel();
});

我花了大约 45 分钟来完成你剩下的代码 self-contained:

  • 在这个例子中,我假设我们
    • 想要等待最多 5 秒以等待新的 header 到达(因此在新的 session 启动后或直到下一个请求到达相同的 session)
    • 之后必须在 1 秒内收到完整信息

另请注意,我们避免关闭套接字 - 这是在 session 的析构函数中完成的。最好优雅关机。

Live Demo

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iomanip>
#include <iostream>

using namespace std::chrono_literals;
namespace bip = boost::interprocess;
using boost::asio::ip::tcp;
using boost::system::error_code;
using Queue = boost::interprocess::message_queue;

static constexpr auto MAX_MESG_LEN = 100;
static constexpr auto MAX_MESGS = 10;

struct Message {
    using Len = boost::endian::big_uint32_t;

    struct header_t {
        Len len;
    };
    static const auto header_length = sizeof(header_t);
    std::array<char, MAX_MESG_LEN + header_length> buf;

    char const* data() const { return buf.data();             } 
    char*       data()       { return buf.data();             } 
    char const* body() const { return data() + header_length; } 
    char*       body()       { return data() + header_length; }

    static_assert(std::is_standard_layout_v<header_t> and
                  std::is_trivial_v<header_t>);

    Len body_length() const     { return std::min(h().len, max_body_length()); } 
    Len max_body_length() const { return buf.max_size() - header_length;       } 
    bool decode_header()        { return h().len <= max_body_length();         } 

    bool set_body(std::string_view value) {
        assert(value.length() <= max_body_length());
        h().len = value.length();
        std::copy_n(value.begin(), body_length(), body());

        return (value.length() == body_length()); // not truncated
    }

  private:
    header_t&       h()       { return *reinterpret_cast<header_t*>(data());       } 
    header_t const& h() const { return *reinterpret_cast<header_t const*>(data()); }
};

struct Session : std::enable_shared_from_this<Session> {
    Session(tcp::socket&& s) : socket_(std::move(s)) {}

    void start() {
        post(strand_,
             [ this, self = shared_from_this() ] { do_read_header(); });
    }

private:
    using Strand = boost::asio::strand<tcp::socket::executor_type>;
    using Timer  = boost::asio::steady_timer;

    tcp::socket socket_{strand_};
    Strand      strand_{make_strand(socket_.get_executor())};
    Message     res;
    Queue       request_queue_{bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN};
    Timer       recv_deadline_{strand_};

    void do_read_header() {
        auto self(shared_from_this());
        std::cout << "do_read_header: " << res.header_length << std::endl;
        recv_deadline_.expires_from_now(5s);
        recv_deadline_.async_wait([ self, this ](error_code ec) {
            if (!ec) {
                std::cerr << "header timeout" << std::endl;
                socket_.cancel();
            }
        });

        boost::asio::async_read(
            socket_, boost::asio::buffer(res.data(), res.header_length),
            [ this, self ](error_code ec, size_t /*length*/) {
                std::cerr << "header: " << ec.message() << std::endl;
                recv_deadline_.cancel();
                if (!ec && res.decode_header()) {
                    do_read_body();
                } else {
                    socket_.shutdown(tcp::socket::shutdown_both);
                }
            });
    }

    void do_read_body() {
        auto self(shared_from_this());
        // Message msg;
        std::cout << "do_read_body: " << res.body_length() << std::endl;

        recv_deadline_.expires_from_now(1s);
        recv_deadline_.async_wait([self, this] (error_code ec) {
            if (!ec) {
                std::cerr << "body timeout" << std::endl;
                socket_.cancel();
            }
        });

        boost::asio::async_read(
            socket_,
            boost::asio::buffer(res.body(), res.body_length()),
            boost::asio::transfer_exactly(res.body_length()),
            [ this, self ](error_code ec, std::size_t length) {
                std::cerr << "body: " << ec.message() << std::endl;
                recv_deadline_.cancel();
                if (!ec) {
                    try {
                        // Not safe to print unless NUL-terminated, see e.g.
                        // https://whosebug.com/questions/66278813/boost-deadline-timer-causes-stack-buffer-overflow/66279497#66279497
                        if (length)
                            request_queue_.send(res.body(), res.body_length(), 0);
                    } catch (const std::exception& ex) {
                        std::cout << ex.what() << std::endl;
                    }
                    do_read_header();
                } else {
                    socket_.shutdown(tcp::socket::shutdown_both);
                }
            });
    }
};

class Server {
  public:
    Server(boost::asio::io_service& io_service, short port)
        : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
          socket_(io_service) {
        do_accept();
    }

  private:
    void do_accept() {
        acceptor_.async_accept(socket_, [ this ](error_code ec) {
            std::cerr << "async_accept: " << ec.message() << std::endl;
            if (!ec) {
                std::cerr << "session: " << socket_.remote_endpoint() << std::endl;
                std::make_shared<Session>(std::move(socket_))->start();
            }

            do_accept();
        });
    }

    tcp::acceptor acceptor_;
    tcp::socket   socket_;
};

int main(int argc, char**) {
    Queue queue{bip::open_or_create, "SendQueue", MAX_MESGS, MAX_MESG_LEN}; // ensure it exists

    if (argc == 1) {
        boost::asio::io_context ioc;
        Server s(ioc, 8989);

        ioc.run_for(10s);
    } else {
        while (true) {
            using Buf = std::array<char, MAX_MESG_LEN>;
            Buf      buf;
            unsigned prio;
            size_t   n;
            queue.receive(buf.data(), buf.size(), n, prio);

            std::cout << "Received: " << std::quoted(std::string_view(buf.data(), n)) << std::endl;
        }
    }
}

可用

测试
./sotest

在另一个终端:

./sotest consumer

还有其他地方,例如一些不会超时的请求:

for msg in '0000 0000' '0000 0001 31' '0000 000c 6865 6c6c 6f20 776f 726c 640a'
do
    xxd -r -p <<< "$msg" |
        netcat localhost 8989 -w 1
done

或者,multi-request单次session,则session超时(-w 6超过5s):

msg='0000 0000 0000 0001 31 0000 000c 6865 6c6c 6f20 776f 726c 640a'; xxd -r -p <<< "$msg"| netcat localhost 8989 -w 6