io_context.run() 在单独的线程块中

io_context.run() in a separate thread blocks

我使用 boost.beast 实现了 RESTServer.hpp,如下所示。

#pragma once

#include <boost/property_tree/json_parser.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio.hpp>
#include <chrono>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <memory>
#include <string>

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;

class RESTServer : public std::enable_shared_from_this<RESTServer> {
public:
    RESTServer(tcp::socket socket)
        : m_socket(std::move(socket)) {
    }

    void start() {
        readRequest();
        checkDeadline();
    }

private:
    tcp::socket m_socket;
    beast::flat_buffer m_buffer{8192};
    http::request<http::dynamic_body> m_request;
    http::response<http::dynamic_body> m_response;

    net::steady_timer m_deadline{m_socket.get_executor(), std::chrono::seconds(60)};

    void readRequest() {
        auto self = shared_from_this();
        http::async_read(m_socket, m_buffer, m_request, [self](beast::error_code ec, std::size_t bytes_transferred) {
            boost::ignore_unused(bytes_transferred);
            if (!ec) {
                self->processRequest();
            }
        });
    }

    void processRequest() {
        m_response.version(m_request.version());
        m_response.keep_alive(false);

        switch (m_request.method()) {
            case http::verb::get:
                m_response.result(http::status::ok);
                m_response.set(http::field::server, "Beast");
                createResponse();
                break;

            case http::verb::post:
                m_response.result(http::status::ok);
                m_response.set(http::field::server, "Beast");
                createResponse();
                break;

            default:
                m_response.result(http::status::bad_request);
                m_response.set(http::field::content_type, "text/plain");
                beast::ostream(m_response.body())
                    << "Invalid request-method '"
                    << std::string(m_request.method_string())
                    << "'";
                break;
        }
        writeResponse();
    }

    void createResponse() {
        if(request_.target() == "/count")
        {
            response_.set(http::field::content_type, "text/html");
            beast::ostream(response_.body())
                << "<html>\n"
                <<  "<head><title>Request count</title></head>\n"
                <<  "<body>\n"
                <<  "<h1>Request count</h1>\n"
                <<  "<p>There have been "
                <<  my_program_state::request_count()
                <<  " requests so far.</p>\n"
                <<  "</body>\n"
                <<  "</html>\n";
        }
        else if(request_.target() == "/time")
        {
            response_.set(http::field::content_type, "text/html");
            beast::ostream(response_.body())
                <<  "<html>\n"
                <<  "<head><title>Current time</title></head>\n"
                <<  "<body>\n"
                <<  "<h1>Current time</h1>\n"
                <<  "<p>The current time is "
                <<  my_program_state::now()
                <<  " seconds since the epoch.</p>\n"
                <<  "</body>\n"
                <<  "</html>\n";
        }
        else
        {
            response_.result(http::status::not_found);
            response_.set(http::field::content_type, "text/plain");
            beast::ostream(response_.body()) << "File not found\r\n";
        }
    }

    void writeResponse() {
        auto self = shared_from_this();

        m_response.set(http::field::content_length, m_response.body().size());

        http::async_write(m_socket, m_response,
                          [self](beast::error_code ec, std::size_t) {
                              self->m_socket.shutdown(tcp::socket::shutdown_send, ec);
                              self->m_deadline.cancel();
                          });
    }

    void checkDeadline() {
        auto self = shared_from_this();

        m_deadline.async_wait([self](beast::error_code ec) {
            if (!ec) {
                self->m_socket.close(ec);
            }
        });
    }
};

void httpServer(tcp::acceptor& acceptor, tcp::socket& socket) {
    acceptor.async_accept(socket, [&](beast::error_code ec) {
        if (!ec) {
            std::make_shared<RESTServer>(std::move(socket))->start();
        }
        httpServer(acceptor, socket);
    });
}

我还有一个 RESTClient RESTClient.hppRESTClient.cpp,如下所示。

RESTClient.hpp

#pragma once

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/strand.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;


// Performs an HTTP GET and prints the response
class RESTClient : public std::enable_shared_from_this<RESTClient> {

public:
    explicit RESTClient(net::io_context& ioc);

    virtual ~RESTClient();

    virtual void run(char const* host, char const* port, char const* target, int version);

    virtual void onResolve(beast::error_code ec, tcp::resolver::results_type results);

    virtual void onConnect(beast::error_code ec, tcp::resolver::results_type::endpoint_type);

    virtual void onWrite(beast::error_code ec, std::size_t bytes_transferred);

    virtual void onRead(beast::error_code ec, std::size_t bytes_transferred);

private:
    void createGetRequest(char const* host, char const* target, int version);

    void createPostRequest(char const* host, char const* target, int version, char const *body);

    std::string createBody();

    tcp::resolver m_resolver;
    beast::tcp_stream m_stream;
    beast::flat_buffer m_buffer; // (Must persist between reads)
    http::request<http::string_body> m_httpRequest;
    http::response<http::string_body> m_httpResponse;
};

RESTClient.cpp

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/strand.hpp>
#include <boost/lexical_cast.hpp>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <string>
#include "RESTClient.hpp"

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;

void fail(beast::error_code ec, char const* what) {
    std::cerr << what << ": " << ec.message() << "\n";
}


RESTClient::RESTClient(net::io_context& ioc)
    : m_resolver(net::make_strand(ioc)), m_stream(net::make_strand(ioc)) {

}

RESTClient::~RESTClient() = default;


void RESTClient::run(char const* host, char const* port, char const* target, int version) {

    createPostRequest(host, target, version, createBody().c_str());

    m_resolver.async_resolve(host, port, beast::bind_front_handler(
        &RESTClient::onResolve,
        shared_from_this()));
}

void RESTClient::onResolve(beast::error_code ec, tcp::resolver::results_type results) {
    if (ec) {
        return fail(ec, "resolve");
    }

    std::cout << "onResolve ******" << std::endl;
    m_stream.expires_after(std::chrono::seconds(30));

    m_stream.async_connect(results, beast::bind_front_handler(
        &RESTClient::onConnect,
        shared_from_this()));
}

void
RESTClient::onConnect(beast::error_code ec, tcp::resolver::results_type::endpoint_type) {
    if (ec) {
        return fail(ec, "connect");
    }

    std::cout << "onConnect ******" << std::endl;

    m_stream.expires_after(std::chrono::seconds(30));

    http::async_write(m_stream, m_httpRequest,
                      beast::bind_front_handler(
                          &RESTClient::onWrite,
                          shared_from_this()));
}

void
RESTClient::onWrite(beast::error_code ec, std::size_t bytes_transferred) {
    boost::ignore_unused(bytes_transferred);

    if (ec) {
        return fail(ec, "write");
    }

    std::cout << "onWrite ******" << std::endl;

    http::async_read(m_stream, m_buffer, m_httpResponse, beast::bind_front_handler(
        &RESTClient::onRead,
        shared_from_this()));
}

void RESTClient::onRead(beast::error_code ec, std::size_t bytes_transferred) {

    boost::ignore_unused(bytes_transferred);

    if (ec) {
        return fail(ec, "read");
    }

    std::cout << "onRead ******" << std::endl;

    std::cout << m_httpResponse << std::endl;


    m_stream.socket().shutdown(tcp::socket::shutdown_both, ec);

    if (ec && ec != beast::errc::not_connected) {
        return fail(ec, "shutdown");
    }
}

void RESTClient::createGetRequest(char const* host, char const* target, int version) {
    m_httpRequest.version(version);
    m_httpRequest.method(http::verb::get);
    m_httpRequest.target(target);
    m_httpRequest.set(http::field::host, host);
    m_httpRequest.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
}


void RESTClient::createPostRequest(char const* host, char const* target, int version, char const* body) {
    m_httpRequest.version(version);
    m_httpRequest.method(http::verb::post);
    m_httpRequest.target(target);
    m_httpRequest.set(http::field::host, host);
    m_httpRequest.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
    m_httpRequest.set(http::field::content_length, boost::lexical_cast<std::string>(strlen(body)));
    m_httpRequest.set(http::field::body, body);
    m_httpRequest.prepare_payload();
}


std::string RESTClient::createBody() {
    boost::property_tree::ptree tree;
    boost::property_tree::read_json("test.json",tree);
    std::basic_stringstream<char> jsonStream;
    boost::property_tree::json_parser::write_json(jsonStream, tree, false);
    std::cout << "json stream :" << jsonStream.str() << std::endl;
    return jsonStream.str();
}


int main(int argc, char** argv) {
    // Check command line arguments.
    if (argc != 4 && argc != 5) {
        std::cerr <<
                  "Usage: http-client-async <host> <port> <target> [<HTTP version: 1.0 or 1.1(default)>]\n" <<
                  "Example:\n" <<
                  "    http-client-async www.example.com 80 /\n" <<
                  "    http-client-async www.example.com 80 / 1.0\n";
        return EXIT_FAILURE;
    }
    auto const host = argv[1];
    auto const port = argv[2];
    auto const target = argv[3];
    int version = argc == 5 && !std::strcmp("1.0", argv[4]) ? 10 : 11;

    // The io_context is required for all I/O
    net::io_context ioc;
    std::cout << "version: " << version << std::endl;

    // Launch the asynchronous operation
    std::make_shared<RESTClient>(ioc)->run(host, port, target, version);

    // Run the I/O service. The call will return when
    // the get operation is complete.
    ioc.run();

    return EXIT_SUCCESS;
}

现在我想使用 googletest 测试我的 RESTClient。在单元测试中,我想使用RESTServer来模拟对客户端的响应。我的测试class如下所示。

class MyTest : public ::testing::Test{
    virtual void SetUp(){
         httpServer(m_acceptor, m_socket);
         m_threads.emplace_back(boost::bind(&boost::asio::io_context::run, &m_ioc));
         m_ioc.run();
    }

    virtual void TearDown() {
    for(auto& thread : m_threads) {
        if(thread.joinable()) {
            thread.join();
        }
    }


    net::ip::address m_address = net::ip::make_address("0.0.0.0");
    unsigned short m_port = static_cast<unsigned short>(8080);
    net::io_context m_ioc{1};

    tcp::acceptor m_acceptor{m_ioc, {m_address, m_port}};
    tcp::socket m_socket{m_ioc};
    std::vector<std::thread> m_threads;

};

我的问题如下。

当我实现 class MyTest 时,我在一个单独的线程中启动 RESTServer。请参阅 SetUp() 中的代码。当我调用 m_ioc.run() 时,服务器 运行s,但是阻塞了。我希望服务器在一个单独的线程中 运行,然后继续执行我的测试用例,我在其中启动客户端并执行一些 GETPOST 操作。

您正在两个线程中调用 run

m_threads.emplace_back(boost::bind(&boost::asio::io_context::run, &m_ioc));

和google测试线程

m_ioc.run();

导致 Setup 阻塞。尝试删除 m_ioc.run(); 因为您已经生成了一个线程来调用 io_context::run.