boost::asio UDP 广播客户端仅接收 "fast" 个数据包

boost::asio UDP Broadcast Client Only Receives "fast" Packets

我使用 boost::asio 编写了一个 UDP 广播客户端。它有效,但有一个警告。如果我发送数据包的速度非常快(至少每 100 毫秒左右发送一个数据包),它似乎会收到所有数据包。但是,如果我只发送一个数据包,它似乎并没有捕捉到它。我正在使用异步接收,所以我无法想象它为什么不起作用。数据本身相当小,并且总是小于分配的缓冲区大小。当它收到“快速”数据包时,它们是正确的并且仅包含来自单个“发送”的数据。在调试器中,它会正确地中断每个发送的数据包。

Header:

class BroadcastClient
    {
    public:
        BroadcastClient();
        std::optional<std::string> poll();

    protected:
        void handle_read(const boost::system::error_code& error, std::size_t bytes_transferred);

    private:
        std::future<void> ioFuture;
        std::vector<uint8_t> buffer;
        std::string result;
        boost::asio::io_service ioService;
        std::unique_ptr<boost::asio::ip::udp::socket> socket;
        uint16_t port{ 8888 };
        boost::asio::ip::udp::endpoint sender_endpoint;
    };

实施:

BroadcastClient::BroadcastClient()
{
    this->socket = std::make_unique<boost::asio::ip::udp::socket>(
        this->ioService, boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::any(), this->port));

    this->socket->set_option(boost::asio::socket_base::broadcast(true));
    this->socket->set_option(boost::asio::socket_base::reuse_address(true));

    this->ioFuture = std::async(std::launch::async, [this] { this->ioService.run(); });
    this->buffer.resize(4096);

    this->socket->async_receive_from(
        boost::asio::buffer(this->buffer, this->buffer.size()), sender_endpoint,
        boost::bind(&BroadcastClient::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void BroadcastClient::handle_read(const boost::system::error_code& error, std::size_t bytes_transferred)
{
    if(!error)
    {
        this->result += std::string(std::begin(buffer), std::begin(buffer) + buffer.size());
        std::fill(std::begin(buffer), std::end(buffer), 0);
        
        this->socket->async_receive_from(
            boost::asio::buffer(this->buffer, this->buffer.size()), sender_endpoint,
            boost::bind(&BroadcastClient::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
    }
}

std::optional<std::string> BroadcastClient::poll()
{
    if(this->result.empty() == false)
    {
        auto copy = this->result;
        this->result.clear();
        return copy;
    }

    return {};
}

我进行了长时间的搜索,因为广播 UDP 可能很挑剔。然后我发现了你的 future<void>。我不仅不相信 std::async 会做你期望的事情(它几乎可以做任何事情),而且还有一个潜在的致命竞赛,这是 99% 确定你的问题:

  • 您启动异步任务 - 它将在/将来某个时间/

  • 然后您添加async_receive_from操作。如果任务已经开始,队列将是空的,run() 完成并且未来是 ready。实际上,当您执行以下操作时,这是可见的:

     ioService.run();
     std::clog << "End of run " << std::boolalpha << ioService.stopped() << std::endl;
    

正在打印

End of run true

大部分时间对我来说。我建议使用线程:

ioThread = std::thread([this] {
    ioService.run();
    std::clog << "End of run " << std::boolalpha << ioService.stopped() << std::endl;
});

对应join:

~BroadcastClient() {
    std::clog << "~BroadcastClient()" << std::endl;
    ioThread.join();
}

To be complete, also handle exceptions: Should the exception thrown by boost::asio::io_service::run() be caught? or use thread_pool(1) which is nice because it also replaces your io_service.

Alternatively, use a work guard (io_service::work or make_executor_guard).

现在,我似乎无法让它在本地测试时丢失数据包。

更多评论

  1. 一般来说,你想更早知道你的代码何时出现错误情况,所以在handle_read中报告error,因为这样的情况会导致异步循环到终止。更固定的见下文handle_read

  2. result 缓冲区不是线程安全的,您可以从多个线程访问它¹。那调用了 Undefined Behavior。添加同步,或使用例如原子交换。

    ¹ to be sure that the poll happens on the service thread you'd have to post the poll operation to the io_service. That's not possible because the service is private

  3. 您在 handle_read 中使用了 buffer.size(),但这是硬编码的 (4096)。你可能想要 bytes_transferred

    result.append(std::begin(buffer), std::begin(buffer) + bytes_transferred);
    

    也避免了不必要的临时。此外,现在您不必将缓冲区重置为零:

    void BroadcastClient::handle_read(const boost::system::error_code& error, std::size_t bytes_transferred) {
        if (!error) {
            std::lock_guard lk(result_mx);
            result.append(std::begin(buffer), std::begin(buffer) + bytes_transferred);
    
            start_read();
        } else {
            std::clog << "handle_read: " << error.message() << std::endl;
        }
    }
    
  4. 为什么socket是动态实例化的?事实上,您应该在构造函数初始化列表中初始化它,或者从 NSMI:

    开始的 C++11
    uint16_t port{ 8888 };
    boost::asio::io_service ioService;
    udp::socket socket { ioService, { {}, port } };
    
  5. async_receive_from 调用重复。这需要 start_read 或类似的方法。另外,考虑使用 lambda 来减少代码,而不是依赖老式的 boost::bind:

    void BroadcastClient::start_read() {
        socket.async_receive_from(
            boost::asio::buffer(buffer), sender_endpoint,
            [this](auto ec, size_t xfr) { handle_read(ec, xfr); });
    }
    

完整列表

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>
#include <thread>
#include <mutex>
using namespace std::chrono_literals;

class BroadcastClient {
    using socket_base = boost::asio::socket_base;
    using udp = boost::asio::ip::udp;
  public:
    BroadcastClient();

    ~BroadcastClient() {
        std::clog << "~BroadcastClient()" << std::endl;
        socket.cancel();
        work.reset();
        ioThread.join();
    }
    std::optional<std::string> poll();

  protected:
    void start_read();
    void handle_read(const boost::system::error_code& error, std::size_t bytes_transferred);

  private:
    uint16_t port{ 8888 };
    boost::asio::io_service ioService;
    boost::asio::executor_work_guard<
        boost::asio::io_service::executor_type> work { ioService.get_executor() };
    udp::socket socket { ioService, { {}, port } };

    std::thread ioThread;
    std::string buffer = std::string(4096, '[=18=]');
    std::mutex result_mx;
    std::string result;
    udp::endpoint sender_endpoint;
};

BroadcastClient::BroadcastClient() {
    socket.set_option(socket_base::broadcast(true));
    socket.set_option(socket_base::reuse_address(true));

    ioThread = std::thread([this] {
        ioService.run();
        std::clog << "Service thread, stopped? " << std::boolalpha << ioService.stopped() << std::endl;
    });

    start_read(); // actually okay now because of `work` guard
}

void BroadcastClient::start_read() {
    socket.async_receive_from(
        boost::asio::buffer(buffer), sender_endpoint,
        [this](auto ec, size_t xfr) { handle_read(ec, xfr); });
}

void BroadcastClient::handle_read(const boost::system::error_code& error, std::size_t bytes_transferred) {
    if (!error) {
        std::lock_guard lk(result_mx);
        result.append(std::begin(buffer), std::begin(buffer) + bytes_transferred);

        start_read();
    } else {
        std::clog << "handle_read: " << error.message() << std::endl;
    }
}

std::optional<std::string> BroadcastClient::poll() {
    std::lock_guard lk(result_mx);
    if (result.empty())
        return std::nullopt;
    else 
        return std::move(result);
}

constexpr auto now = std::chrono::steady_clock::now;

int main() {
    BroadcastClient bcc;

    for (auto start = now(); now() - start < 3s;) {
        if (auto r = bcc.poll())
            std::cout << std::quoted(r.value()) << std::endl;

        std::this_thread::sleep_for(100ms);
    }
} // BroadcastClient destructor safely cancels the work

现场测试

g++ -std=c++17 -O2 -Wall -pedantic -pthread main.cpp
while sleep .05; do echo -n "hello world $RANDOM" | netcat -w 0 -u 127.0.0.1 8888 ; done&
./a.out
kill %1

版画

"hello world 18422"
"hello world 3810"
"hello world 26191hello world 10419"
"hello world 23666hello world 18552"
"hello world 2076"
"hello world 19871hello world 8978"
"hello world 1836"
"hello world 11134hello world 16603"
"hello world 3748hello world 8089"
"hello world 27946"
"hello world 14834hello world 15274"
"hello world 26555hello world 6695"
"hello world 32419"
"hello world 26996hello world 26796"
"hello world 9882"
"hello world 680hello world 29358"
"hello world 9723hello world 31163"
"hello world 3646"
"hello world 10602hello world 22562"
"hello world 18394hello world 17229"
"hello world 20028"
"hello world 14444hello world 3890"
"hello world 16258"
"hello world 28555hello world 21184"
"hello world 31342hello world 30891"
"hello world 3088"
"hello world 1051hello world 5638"
"hello world 24308hello world 7748"
"hello world 18398"
~BroadcastClient()
handle_read: Operation canceled
Service thread, stopped? true

可能/仍然/感兴趣的旧答案内容

等等。我注意到这不是“常规”点对点 UDP。

据我了解,多播由路由器提供。他们必须维护复杂的“订阅”端点表,以便他们知道将实际数据包转发到哪里。

许多路由器都在与这些问题作斗争,可靠性存在内在缺陷,尤其是在 WiFi 等方面。如果您有一个路由器(或者更确切地说是包含路由器的拓扑结构)正在与这些问题作斗争,我不会/不会/感到惊讶这也是并且只是在某个时间间隔停止“记住”多播组中的参与端点。

我认为这种类型的表必须保存在路由的每一跳中(包括内核,它可能必须跟踪同一多播组的多个进程)。

关于此的一些提示:

一个经常听到的建议是:

  • 如果可以的话,使用多播进行dicscovery,之后切换到单播
  • 尝试具体说明绑定接口(在您的代码中,您可能希望将 address_v4::any() 替换为 lo (127.0.0.1) 或任何标识您的 NIC 的 IP 地址。