Asio async_read_until 异步 TCP 服务器中的 EOF 错误

Asio async_read_until EOF Error in Asynchronous TCP Server

当我构建它时,运行ning 服务器然后 运行 客户端出现错误 错误代码 = 2,错误消息 = End of file

当我编写同步 tcp 服务器时,它工作正常;

谢谢

完整的客户端代码

#include <boost/predef.h> // Tools to identify the os

#ifdef BOOST_OS_WINDOWS
#define _WIN32_WINNT 0x0501

#if _WIN32_WINNT <= 0x0502
    #define BOOST_ASIO_DISABLE_TOCP
    #define BOOST_ASIO_ENABLE_CANCELIO
#endif
#endif

#include <boost/asio.hpp>
#include <mutex>
#include <thread>
#include <memory>
#include <iostream>
#include <map>

using namespace boost;


typedef void(*Callback) (unsigned int request_id, const std::string& response, const system::error_code& ec);

struct Session{
    Session(asio::io_service& ios, const std::string& raw_ip_address, unsigned short port_num, const std::string& request, unsigned int id, Callback callback) : m_sock(ios), m_ep(asio::ip::address::from_string(raw_ip_address),port_num), m_request(request), m_id(id), m_callback(callback), m_was_cancelled(false) {} 

    asio::ip::tcp::socket m_sock;
    asio::ip::tcp::endpoint m_ep; // Remote endpoint
    std::string m_request;

    // streambuf where the response will be stored.
    asio::streambuf m_response_buf;
    std::string m_response; // Response represented as a string

    system::error_code m_ec;

    unsigned int m_id;

    Callback m_callback;

    bool m_was_cancelled;
    std::mutex m_cancel_guard;
};

class AsyncTCPClient : public boost::asio::noncopyable {
public: 
    AsyncTCPClient(){
        m_work.reset(new boost::asio::io_service::work(m_ios));

        m_thread.reset(new std::thread([this](){
            m_ios.run();
        }));
    }


    void emulateLongComputationOp( unsigned int duration_sec, const std::string& raw_ip_address, unsigned short port_num, Callback callback, unsigned int request_id){
        std::string request = "EMULATE_LONG_CALC_OP " + std::to_string(duration_sec) + "\n";
        std::cout << "Request: " << request << std::endl;

        std::shared_ptr<Session> session = std::shared_ptr<Session> (new Session(m_ios, raw_ip_address, port_num, request, request_id, callback));

        session->m_sock.open(session->m_ep.protocol());

        // active sessions list can be accessed from multiple thread, we guard it with a mutex to avoid data coruption
        std::unique_lock<std::mutex> lock(m_active_sessions_guard);
        m_active_sessions[request_id] = session;
        lock.unlock();

        session->m_sock.async_connect(session->m_ep, [this, session](const system::error_code& ec) {
            if (ec.value() != 0) {
                session->m_ec = ec;
                onRequestComplete(session);
                return;
            }

            std::unique_lock<std::mutex> cancel_lock(session->m_cancel_guard);

            if (session->m_was_cancelled) {
                onRequestComplete(session);
                return;
            }

            asio::async_write(session->m_sock, asio::buffer(session->m_request), [this, session](const boost::system::error_code &ec, std::size_t bytes_transferred) {
                if (ec.value() != 0) {
                    session->m_ec = ec;
                    onRequestComplete(session);
                    return;
                }
                std::unique_lock<std::mutex> cancel_lock(session->m_cancel_guard);

                if (session->m_was_cancelled) {
                    onRequestComplete(session);
                    return;
                }
                
                asio::async_read_until(session->m_sock, session->m_response_buf, '\n',
                                       [this, session](const boost::system::error_code &ec,
                                                       std::size_t bytes_transferred) {
                                           if (ec.value() != 0) {
                                               session->m_ec = ec;
                                           } else {
                                               std::istream strm(&session->m_response_buf);
                                               std::getline(strm, session->m_response);
                                           }

                                           onRequestComplete(session);
                                       });
            });
        });
    };

    // Cancels the request
    void cancelRequest(unsigned int request_id){
        std::unique_lock<std::mutex> lock(m_active_sessions_guard);

        auto it = m_active_sessions.find(request_id);
        if(it != m_active_sessions.end()){
            std::unique_lock<std::mutex> cancel_lock(it->second->m_cancel_guard);

            it->second->m_was_cancelled = true;
            it->second->m_sock.cancel();
        }
    }


    void close(){
        // Destroy work object
        m_work.reset(NULL);
        // wait for the I/O thread tot exit
        m_thread->join();
    }

private:
    void onRequestComplete(std::shared_ptr<Session> session){
        // shutting down the connection, we don't care about the error code if function failed
        boost::system::error_code ignored_ec;

        session->m_sock.shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec);

        // remove session from the map of active sessions
        std::unique_lock<std::mutex> lock(m_active_sessions_guard);

        auto it = m_active_sessions.find(session->m_id);
        if(it != m_active_sessions.end()){
            m_active_sessions.erase(it);
        }

        lock.unlock();

        boost::system::error_code ec;

        if(session->m_ec.value() == 0 && session->m_was_cancelled){
            ec = asio::error::operation_aborted;
        }else{
            ec = session->m_ec;
        }

        session->m_callback(session->m_id, session->m_response, ec);
    };
private:
    asio::io_service m_ios;
    std::map<int, std::shared_ptr<Session>> m_active_sessions;
    std::mutex m_active_sessions_guard;
    std::unique_ptr<boost::asio::io_service::work> m_work;
    std::unique_ptr<std::thread> m_thread;
};


void handler(unsigned int request_id, const std::string& response, const system::error_code& ec){
    if(ec.value() == 0){
        std::cout << "Request #" << request_id << " has completed. Reponse: "<< response << std::endl;
    }else if(ec == asio::error::operation_aborted){
        std::cout << "Request #" << request_id << " has been cancelled by the user. "  << std::endl;
    }else{
        std::cout << "Request #" << request_id << " failed! Error code = " << ec.value() << ". Error Message = " << ec.message() << std::endl;
    }
    return;
}


int main(){
    try{
        AsyncTCPClient client;

        // emulate the user's behavior
        client.emulateLongComputationOp(10, "127.0.0.1", 3333, handler, 1);

        std::this_thread::sleep_for(std::chrono::seconds(60));

        // another request with id 2
        client.emulateLongComputationOp(11, "127.0.0.1", 3334, handler, 2);

        // cancel request 1
        client.cancelRequest(1);

        std::this_thread::sleep_for(std::chrono::seconds(6));

        // another request with id 3
        client.emulateLongComputationOp(12, "127.0.0.1", 3335, handler, 3);

        std::this_thread::sleep_for(std::chrono::seconds(15));

        // exit the application
        client.close();
    }
    catch(system::system_error &e){
        std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.what();

        return e.code().value();
    }

    return 0;
}

完整服务器代码

#include <boost/asio.hpp>

#include <thread>
#include <atomic>
#include <memory>
#include <iostream>

using namespace boost;

class Service {
public:
    Service(std::shared_ptr<asio::ip::tcp::socket> sock) : m_sock(sock) {}

    void StartHandling() {
        asio::async_read_until(*m_sock.get(), m_request, '\n', [this](const boost::system::error_code& ec, std::size_t bytes_transferred){
            onRequestReceived(ec, bytes_transferred);
        });
        std::istream is(&m_request);
        std::string line;
        std::getline(is, line);
        std::cout << "m_request: " << line << std::endl;
    }

private:
    void onRequestReceived(const boost::system::error_code& ec, std::size_t bytes_transfered){
        std::cout << "ec.value : " << ec.value() << std::endl;
        if (ec.value() != 0){
            std::cout << "Error occurred! Error code = " << ec.value() << ".Message: " << ec.message();
            onFinish();
            return;
        }

        // Process the request
        asio::async_write(*m_sock.get(), asio::buffer(m_response), [this](const boost::system::error_code& ec, std::size_t bytes_transferred){
            onResponseSent(ec, bytes_transferred);
        });
    }

    void onResponseSent(const boost::system::error_code& ec, std::size_t bytes_transferred){
        if(ec.value() != 0){
            std::cout << "Error occurred! Error code = " << ec.value() << ". Message: " << ec.message();
        }

        onFinish();
    }

    // cleanup
    void onFinish(){
        delete this;
    }

    std::string ProcessingRequest(asio::streambuf& request){
        // parse the request, process it and prepare the request

        // Emulating CPU-consuming operations
        int i = 0;
        while (i != 1000){
            i++;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }

        std::string response = "Response\n";
        return response;
    }

    std::shared_ptr<asio::ip::tcp::socket> m_sock;
    std::string m_response;
    asio::streambuf m_request;
};


class Acceptor {
public:
    Acceptor(asio::io_service& ios, unsigned short port_num) : m_ios(ios), m_acceptor(m_ios, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), port_num)), m_isStopped(
            false) {}
    // Start accepting incoming connection request.
    void Start(){
        m_acceptor.listen();
        InitAccept();
    }

    void Stop() {
        m_isStopped.store(true);
    }

private:
    void InitAccept() {
        std::shared_ptr<asio::ip::tcp::socket> sock(new asio::ip::tcp::socket(m_ios));

        m_acceptor.async_accept(*sock.get(), [this, sock](const boost::system::error_code& error){
            onAccept(error, sock);
        });
    }

    void onAccept(const boost::system::error_code& ec, std::shared_ptr<asio::ip::tcp::socket> sock){
        if(ec.value() == 0){
            (new Service(sock))->StartHandling();
        }else{
            std::cout << "Error occurred! Error code = " << ec.value() << ". Message: " << ec.message();
        }

        // Init next accept operation if acceptor has not been stopped yet
        if(!m_isStopped.load()){
            InitAccept();
        }else{
            // free resources
            m_acceptor.close();
        }
    }

private:
    asio::io_service& m_ios;
    asio::ip::tcp::acceptor m_acceptor;
    std::atomic<bool> m_isStopped;
};


class Server{
public:
    Server() {
       m_work.reset(new asio::io_service::work(m_ios));
    }

    // Start the server
    void Start(unsigned short port_num, unsigned int thread_pool_size){
        assert(thread_pool_size > 0);

        // Create and start Acceptor
        acc.reset(new Acceptor(m_ios, port_num));
        acc->Start();

        // Create specified number of thread and add them to the pool
        for(unsigned int i = 0; i < thread_pool_size; i++){
            std::cout << "Thread " << i << " Running !";
            std::unique_ptr<std::thread> th(new std::thread([this](){
                m_ios.run();
            }));
            m_thread_pool.push_back(std::move(th));
        }
    }

    // Stop the Server
    void Stop(){
        acc->Stop();
        m_ios.stop();

        for(auto& th : m_thread_pool){
            th->join();
        }
    }

private:
    asio::io_service m_ios;
    std::unique_ptr<asio::io_service::work> m_work;
    std::unique_ptr<Acceptor> acc;
    std::vector<std::unique_ptr<std::thread>> m_thread_pool;
};

const unsigned int DEFAULT_THREAD_POOL_SIZE = 2;

int main(){
    unsigned short port_num = 3333;

    try{
        Server srv;

        unsigned int thread_pool_size = std::thread::hardware_concurrency() * 2;

        if (thread_pool_size == 0){
            thread_pool_size = DEFAULT_THREAD_POOL_SIZE;
        }

        srv.Start(port_num, thread_pool_size);

        std::this_thread::sleep_for(std::chrono::seconds(60));

        srv.Stop();
    }
    catch(system::system_error &e){
        std::cout << "Error occurred! Error code = " << e.code() << ". Message: " << e.what();
    }

    return 0;
}

服务器在发送(空)响应后关闭连接。这自然会导致客户端出现 EOF。随便处理一下。

有很多代码味道

  • delete this;是个可恶的东西,就让Serviceshared_from_this.

  • 除此之外不需要使用shared_ptrs

  • 当你使用智能指针时,就使用它们。不要只是为了取消引用而“转换为原始指针”(所以 *m_socket 而不是 *m_socket.get())。

  • 实际上,您的代码中应该没有必要使用newdeleteget()

  • 您在 async_read_until 之后立即访问 m_request 太早了,

    • 这是一场数据竞赛(所以Undefined Behaviour
    • 它没有收到请求,因为 async_read_until 还没有完成。

    所以至少将该代码移至 onRequestReceived

  • 当您已经有 bytes_transferred 时,完全没有必要使用 istream 从请求中读取行。我建议

       if (bytes_transferred) {
           std::string line(m_request.data().data(), bytes_transferred - 1);
           m_request.consume(bytes_transferred);
           std::cout << "request: " << line << std::endl;
       }
    

    甚至:

       std::cout << "request: ";
       std::cout.write(asio::buffer_cast<char const*>(m_request.data()),
                       bytes_transferred - 1);
       m_request.consume(bytes_transferred);
    

    或者,如果您确实想显示整个 m_request,只需

       std::cout << "m_request: " << &m_request << std::endl;
    
  • 请注意,read_until 可能不仅仅包括定界符;为了您的安全,您可能想要验证没有其他数据尾随,或者也处理它

  • 切勿打开 error_code::value(),否则会丢失错误类别,这对于解释错误代码至关重要。

  • 为什么每个线程都 unique_ptr?只是一个 deque<thread>:

     while (thread_pool_size--)
         m_thread_pool.emplace_back([this] { m_ios.run(); });
    

    但看到Should the exception thrown by boost::asio::io_service::run() be caught?

  • 为什么 unique_ptr 接受者?

  • 为什么要为受体单独 class?这不像服务器允许超过 1

  • 为什么还要线程向量?更喜欢boost::thread_group

  • 为什么要手动线程池?首选 asio::thread_pool - 它已经使用 hardware_concurrency(如果可用)

在审查方面,TCPAsyncClient 看起来像是实施 async_result 协议的尝试。它在很多方面都没有达到目标。所以我只会指出类似 or 的内容。它们具有非常相似的界面(如果我没记错的话,也许除了取消之外)。

Fixed/Return 演示

这是完成的示例。它包括请求解析,因此服务器等待请求的实际时间。

我将所有时间都缩小了 10 倍,以便可以在线完成。

客户端和服务器是单一来源的。开始于:

./sotest&
./sotest client
wait

两者都在 6 秒内完成(见下面的屏幕截图)

Live On Coliru

#include <boost/asio.hpp>
#include <boost/spirit/home/x3.hpp> // for request parsing
#include <iomanip>
#include <iostream>
#include <map>
#include <mutex>
#include <thread>

namespace asio = boost::asio;
using asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using std::this_thread::sleep_for;

/////// server //////////////////////////////////////////////////////////
struct Service : std::enable_shared_from_this<Service> {
    Service(tcp::socket sock) : m_sock(std::move(sock)) {}

    void StartHandling() {
        async_read_until(
            m_sock, asio::dynamic_buffer(m_request), '\n',
            [this, self = shared_from_this()](error_code ec, size_t bytes) {
                onRequestReceived(ec, bytes);
            });
    }

  private:
    void onRequestReceived(error_code ec, size_t /*bytes*/) {
        std::cout << "onRequestReceived: " << ec.message() << std::endl;
        if (ec)
            return;

        // Process the request
        m_response = ProcessingRequest(m_request);

        async_write(
            m_sock, asio::buffer(m_response),
            [this, self = shared_from_this()](error_code ec, size_t bytes) {
                onResponseSent(ec, bytes);
            });
    }

    void onResponseSent(error_code ec, size_t /*bytes*/) {
        std::cout << "onResponseSent: " << ec.message() << std::endl;
    }

    std::string static ProcessingRequest(std::string request) {
        std::cout << "request: " << request << std::endl;

        // parse the request, process it and prepare the response
        namespace x3 = boost::spirit::x3;
        double value;
        if (parse(request.begin(), request.end(),
                "EMULATE_LONG_CALC_OP " >> x3::double_ >> "s" >> x3::eol >> x3::eoi,
                value)) //
        {
            // Emulating time-consuming operation
            sleep_for(1.0s * value);
            return "Waited " + std::to_string(value) + "s\n";
        }

        return "Unknown request\n";
    }

    tcp::socket m_sock;
    std::string m_request, m_response;
};

struct Server {
    Server(asio::any_io_executor ex, uint16_t port_num)
        : m_acceptor{ex, {{}, port_num}} {
        m_acceptor.listen();
        accept_loop();
    }

    void Stop() { m_acceptor.cancel(); }

  private:
    void accept_loop() {
        m_acceptor.async_accept([this](error_code ec, tcp::socket sock) {
            std::cout << "OnAccept: " << ec.message() << std::endl;
            if (!ec) {
                std::make_shared<Service>(std::move(sock))->StartHandling();
                accept_loop();
            } 
            //m_acceptor.close();
        });
    }

    tcp::acceptor m_acceptor;
};

void server(uint16_t port) try {
    asio::thread_pool io;
    Server            srv{io.get_executor(), port};

    sleep_for(6s);

    srv.Stop();
    io.join();
} catch (std::exception const& e) {
    std::cout << "Exception: " << e.what() << std::endl;
}

/////// client //////////////////////////////////////////////////////////

struct RequestOp : public std::enable_shared_from_this<RequestOp> {
    using Callback = std::function<void( //
        unsigned /*request_id*/, std::string_view /*response*/, error_code)>;

    RequestOp(asio::any_io_executor ex, const std::string& raw_ip_address,
              uint16_t port_num, std::string request, unsigned id,
              Callback callback)
        : m_ep(asio::ip::address::from_string(raw_ip_address), port_num)
        , m_sock(ex, m_ep.protocol())
        , m_request(std::move(request))
        , m_id(id)
        , m_callback(callback) {}

    void Run() {
        // assumed on logical strand
        m_sock.async_connect(
            m_ep, [this, self = shared_from_this()](error_code ec) {
                if ((m_ec = ec) || m_was_cancelled)
                    return onComplete();

                asio::async_write(m_sock, asio::buffer(m_request),
                                  [this, self = shared_from_this()](
                                      error_code ec, size_t /*bytes*/) {
                                      onRequestWritten(ec);
                                  });
            });
    }

    void Cancel() {
        m_was_cancelled = true;
        dispatch(m_sock.get_executor(), [self=shared_from_this()]{ self->doCancel(); });
    }

  private:
    void doCancel() {
        m_sock.cancel();
    }

    void onRequestWritten(error_code ec) {
        if ((m_ec = ec) || m_was_cancelled)
            return onComplete();

        asio::async_read_until(
            m_sock, asio::dynamic_buffer(m_response), '\n',
            [this, self = shared_from_this()](error_code ec, size_t bytes) {
                onResponseReceived(ec, bytes);
            });
    }

    void onResponseReceived(error_code ec, size_t /*bytes*/) {
        if ((m_ec = ec) || m_was_cancelled)
            return onComplete();

        if (!m_response.empty())
            m_response.resize(m_response.size() - 1); // drop '\n'

        onComplete();
    }

    void onComplete() {
        // shutting down the connection, we don't care about the error code
        // if function failed
        error_code ignored_ec;
        m_sock.shutdown(tcp::socket::shutdown_both, ignored_ec);

        if(!m_ec && m_was_cancelled){
            m_ec = asio::error::operation_aborted;
        }

        m_callback(m_id, m_response, m_ec);
    }

    tcp::endpoint m_ep; // Remote endpoint
    tcp::socket   m_sock;
    std::string   m_request;

    std::string m_response; // Response represented as a string

    error_code m_ec;

    unsigned m_id;

    Callback m_callback;

    std::atomic_bool m_was_cancelled{false};
};

class AsyncTCPClient {
  public:
    AsyncTCPClient(asio::any_io_executor ex) : m_executor(ex) {}

    using Duration = std::chrono::steady_clock::duration;

    size_t emulateLongCalcOp(Duration delay, std::string const& raw_ip_address,
                             uint16_t port_num, RequestOp::Callback callback) {
        auto request =
            "EMULATE_LONG_CALC_OP " + std::to_string(delay / 1.0s) + "s\n";
        std::cout << "Request: " << request << std::flush;

        auto const request_id = m_nextId++;
        auto session = std::make_shared<RequestOp>(
            make_strand(m_executor), //
            raw_ip_address, port_num, request, request_id, callback);

        {
            // active sessions list can be accessed from multiple thread, we
            // guard it with a mutex to avoid data coruption
            std::unique_lock lock(m_active_sessions_guard);

            auto [_,ok] = m_pending_ops.emplace(request_id, session);
            assert(ok); // duplicate request_id?

            // optionally: garbage collect completed sessions
            std::erase_if(m_pending_ops,
                          [](auto& kv) { return kv.second.expired(); });
        };

        session->Run();
        return request_id;
    }

    // Cancels the request
    void cancelRequest(unsigned request_id) {
        std::unique_lock lock(m_active_sessions_guard);

        if (auto session = m_pending_ops[request_id].lock())
            session->Cancel();
    }

  private:
    using PendingOp = std::weak_ptr<RequestOp>;

    asio::any_io_executor    m_executor;
    std::mutex               m_active_sessions_guard;
    size_t                   m_nextId = 1;
    std::map<int, PendingOp> m_pending_ops;
};

void handler(unsigned request_id, std::string_view response, error_code ec) {
    std::cout << "Request #" << request_id << " ";

    if (!ec.failed())
        std::cout << "Response: " << std::quoted(response) << std::endl;
    else if (ec == asio::error::operation_aborted)
        std::cout << "Cancelled" << std::endl;
    else
        std::cout << ec.message() << std::endl;
}

void client(uint16_t port) try {
    asio::thread_pool io;

    {
        AsyncTCPClient client(io.get_executor());

        auto id1 = client.emulateLongCalcOp(4s, "127.0.0.1", port, handler);
        auto id2 = client.emulateLongCalcOp(1100ms, "127.0.0.1", port, handler);
        auto id3 = client.emulateLongCalcOp(3500ms, "127.0.0.1", port, handler);

        // cancel request 1
        sleep_for(3s);
        client.cancelRequest(id1);

        sleep_for(1200ms);

        client.cancelRequest(id2); // no effect, already completed
        client.cancelRequest(id3); // no effect, already completed
        // exit the application
    }

    io.join();
} catch (std::exception const& e) {
    std::cout << "Exception: " << e.what() << std::endl;
}

/////// main //////////////////////////////////////////////////////////
int main(int argc, char**) {
    if (argc > 1)
        client(3333);
    else
        server(3333);
}

打印客户端:

Request: EMULATE_LONG_CALC_OP 4.000000s                                                     
Request: EMULATE_LONG_CALC_OP 1.100000s                                                                                                                                   
Request: EMULATE_LONG_CALC_OP 3.500000s                                                                                
Request #2 Response: "Waited 1.100000s"                                                                                                     
Request #1 Cancelled                                               
Request #3 Response: "Waited 3.500000s"                                                                                                     

打印服务器:

OnAccept: Success
OnAccept: Success
onRequestReceived: Success
request: EMULATE_LONG_CALC_OP 1.100000s

onRequestReceived: Success
request: EMULATE_LONG_CALC_OP 4.000000s

OnAccept: Success
onRequestReceived: Success
request: EMULATE_LONG_CALC_OP 3.500000s

onResponseSent: Success
onResponseSent: Success
onResponseSent: Success
OnAccept: Operation canceled