尝试写入 UDP 服务器 class,io_context 不会阻止

Trying to write UDP server class, io_context doesn't block

我尝试打开一个 UDP 服务器。一个婴儿示例有效(我收到了我所期望的以及 wireshark 也显示的内容): 宝贝示例:

int main(int argc, char* const argv[])
{
  try 
  {
    boost::asio::io_context io_context;
    boost::asio::ip::udp::endpoint ep(boost::asio::ip::udp::v4(), 60001);
    boost::asio::ip::udp::socket sock(io_context, ep);
    UDPServer server(std::move(sock), callbackUDP);
    io_context.run();
  }
  catch (std::exception& e) 
  {
      std::cerr << e.what() << std::endl;
  }
}

UDPServer.hpp:

#include <boost/asio.hpp>
#include <functional>
#include <vector>
#include <thread>

#define BUFFERSIZE 1501

class UDPServer
{
public:
    explicit UDPServer(boost::asio::ip::udp::socket socket, std::function<void(const std::vector<char>&)> callbackFunction);
    virtual ~UDPServer();
private:
    void read();
    boost::asio::ip::udp::socket socket_;
    boost::asio::ip::udp::endpoint endpoint_;
    std::function<void(const std::vector<char>&)> callbackFunction_;
    char data_[1500 + 1]; // 1500 bytes is safe limit as it is max of ethernet frame, +1 is for [=13=] terminator
};

UDPServer.cpp:

#include <iostream>
#include "UDPServer.h"

UDPServer::UDPServer(boost::asio::ip::udp::socket socket, std::function<void(const std::vector<char>&)> callbackFunction):
socket_(std::move(socket)),
callbackFunction_(callbackFunction)
{
    read();
}

UDPServer::~UDPServer()
{
}

void UDPServer::read() 
{
    socket_.async_receive_from(boost::asio::buffer(data_, 1500), endpoint_,
        [this](boost::system::error_code ec, std::size_t length) 
        {
            if (ec)
            {
                return;
            }
            data_[length] = '[=14=]';
            if (strcmp(data_, "\n") == 0)
            {
                return;
            }
            std::vector<char> dataVector(data_, data_ + length);
            callbackFunction_(dataVector);
            read();
        }
    );
}

现在我想把它转换成一个 class 作为构造函数,只有端口和一个回调函数(让我们忘记后者,现在只打印消息,添加回调通常是没有的问题)。

我尝试了以下方法,但它不起作用:

int main(int argc, char* const argv[])
{
  UDPServer server(60001);
}

UDPServer.h:

#include <boost/asio.hpp>
#include <functional>
#include <vector>
#include <thread>

#define BUFFERSIZE 1501

class UDPServer
{
public:
    explicit UDPServer(uint16_t port);
    virtual ~UDPServer();
private:
    boost::asio::io_context io_context_;
    boost::asio::ip::udp::socket socket_;
    boost::asio::ip::udp::endpoint endpoint_;
    std::array<char, BUFFERSIZE> recv_buffer_;
    std::thread thread_;
    void run();
    void start_receive();
    void handle_reply(const boost::system::error_code& error, std::size_t bytes_transferred);
};

UDPServer.cpp:

#include <iostream>
#include "UDPServer.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <iostream>

UDPServer::UDPServer(uint16_t port):
endpoint_(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)),
io_context_(),
socket_(io_context_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)),
thread_(&UDPServer::run, this)
{
        start_receive();
}

UDPServer::~UDPServer()
{
    io_context_.stop();
    thread_.join();
}

void UDPServer::start_receive()
{
    socket_.async_receive_from(boost::asio::buffer(recv_buffer_), endpoint_,
        boost::bind(&UDPServer::handle_reply, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void UDPServer::handle_reply(const boost::system::error_code& error, std::size_t bytes_transferred)
{
    if (!error)
    {
        try {
            std::string string(recv_buffer_.data(), recv_buffer_.data() + bytes_transferred);
            std::cout << "Message received: " << std::to_string(bytes_transferred) << ", " << string << std::endl;
        }
        catch (std::exception ex) {
            std::cout << "handle_reply: Error parsing incoming message:" << ex.what() << std::endl;
        }
        catch (...) 
        {
            std::cout << "handle_reply: Unknown error while parsing incoming message" << std::endl;
        }
    }
    else
    {
        std::cout << "handle_reply: error: " << error.message() << std::endl;
    }
    start_receive();
}

void UDPServer::run()
{
    try {
        io_context_.run();
    } catch( const std::exception& e ) 
    {
        std::cout << "Server network exception: " << e.what() << std::endl;
    }
    catch(...) 
    {
        std::cout << "Unknown exception in server network thread" << std::endl;
    }
    std::cout << "Server network thread stopped" << std::endl;
};

当 运行 我得到“服务器网络线程已停止”。 io_context 似乎没有启动,也没有阻塞。有人知道我做错了什么吗?非常感谢!

EDIT 在评论后尝试了这个,结果相同(除了消息在 1 秒后出现)

UDPServer::UDPServer(uint16_t port):
endpoint_(boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port)),
io_context_(),
socket_(io_context_, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port))
{
        start_receive();
        std::this_thread::sleep_for (std::chrono::seconds(1));
        thread_ = std::thread(&UDPServer::run, this);
}

您的析构函数明确告诉服务停止:

UDPServer::~UDPServer() {
    io_context_.stop();
    thread_.join();
}

这是你问题的一部分。另一部分正如评论中指出的那样:你有一个竞争条件,线程在你甚至 post 你的第一个异步操作之前退出。

加个work guard解决:

boost::asio::io_context io_;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_ {io_.get_executor()};

现在析构函数可以是:

UDPServer::~UDPServer() {
    work_.reset(); // allow service to run out of work
    thread_.join();
}

其他说明:

  • 避免在发生错误时链接回 start_receive

  • std::to_string 是多余的

  • 成员的初始化顺序由它们声明的顺序决定,而不是它们在初始化列表中的初始化顺序。使用 -Wall -Wextra -pedantic 捕获这些错误源 = 在您的服务线程中处理异常(参见 Should the exception thrown by boost::asio::io_service::run() be caught?

  • 我建议 std::bind 而不是 boost::bind:

    std::bind(&UDPServer::handle_reply, this,
              std::placeholders::_1,
              std::placeholders::_2));
    
  • 或者只使用 lambda:

     [this](error_code ec, size_t xfer) { handle_reply(ec, xfer); });
    

现场演示

Compiler Explorer

#include <boost/asio.hpp>
#include <fstream>
#include <functional>
#include <iomanip>
#include <iostream>
#include <thread>
#include <vector>

using boost::asio::ip::udp;
using boost::system::error_code;
using boost::asio::io_context;

#define BUFFERSIZE 1501

class UDPServer {
  public:
    explicit UDPServer(uint16_t port);
    virtual ~UDPServer();

  private:
    io_context io_;
    boost::asio::executor_work_guard<io_context::executor_type> work_ {io_.get_executor()};
    udp::endpoint endpoint_;
    udp::socket socket_;
    std::array<char, BUFFERSIZE> recv_buffer_;
    std::thread thread_;

    void run();
    void start_receive();
    void handle_reply(const error_code& error, size_t transferred);
};

UDPServer::UDPServer(uint16_t port)
        : endpoint_(udp::endpoint(udp::v4(), port)),
          socket_(io_, endpoint_), 
          thread_(&UDPServer::run, this) {
    start_receive();
}

UDPServer::~UDPServer() {
    work_.reset(); // allow service to run out of work
    thread_.join();
}

void UDPServer::start_receive() {
    socket_.async_receive_from(boost::asio::buffer(recv_buffer_), endpoint_,
#if 0
            std::bind(&UDPServer::handle_reply, this,
                std::placeholders::_1,
                std::placeholders::_2));
#else
            [this](error_code ec, size_t xfer) { handle_reply(ec, xfer); });
#endif
}

void UDPServer::handle_reply(const error_code& error, size_t transferred) {
    if (!error) {
        try {
            std::string_view s(recv_buffer_.data(), transferred);
            std::cout << "Message received: " << transferred << ", "
                      << std::quoted(s) << "\n";
        } catch (std::exception const& ex) {
            std::cout << "handle_reply: Error parsing incoming message:"
                      << ex.what() << "\n";
        } catch (...) {
            std::cout
                << "handle_reply: Unknown error while parsing incoming message\n";
        }

        start_receive();
    } else {
        std::cout << "handle_reply: error: " << error.message() << "\n";
    }
}

void UDPServer::run() {
    while (true) {
        try {
            if (io_.run() == 0u) {
                break;
            }
        } catch (const std::exception& e) {
            std::cout << "Server network exception: " << e.what() << "\n";
        } catch (...) {
            std::cout << "Unknown exception in server network thread\n";
        }
    }
    std::cout << "Server network thread stopped\n";
}

int main() {
    std::cout << std::unitbuf;
    UDPServer server(60001);
}

用随机词测试:

sort -R /etc/dictionaries-common/words | while read w; do sleep 1; netcat -u localhost 60001 -w 0 <<<"$w"; done

实时输出: