Boost Asio 和 Udp Poll() 没有传入数据

Boost Asio and Udp Poll() No incoming data

我必须以每秒 100 毫秒的速度并行处理来自 100 个端口的信息。

我正在使用 Ubuntu OS.

我做了一些研究,发现 poll() 函数是一个很好的候选者,以避免打开 100 个线程来处理来自 udp 协议的并行数据。

我用 boost 完成了主要部分,我尝试将 poll() 与 boost 集成。

问题是当我尝试通过客户端向服务器发送数据时,我什么也没收到。 根据 wireshark,数据正在正确的主机上。 (本地主机,端口 1234)

我是不是遗漏了什么,还是我写错了什么?

测试代码(服务器) :

#include <deque>
#include <iostream>
#include <chrono>
#include <thread>

#include <sys/poll.h>

#include <boost/optional.hpp>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>


using boost::asio::ip::udp;
using namespace boost::asio;

using namespace std::chrono_literals;

std::string ip_address = "127.0.0.1";

template<typename T, size_t N>
size_t arraySize( T(&)[N] )
{
    return(N);
}

class UdpReceiver
{
    using Resolver   = udp::resolver;
    using Sockets    = std::deque<udp::socket>;
    using EndPoint   = udp::endpoint;
    using Buffer     = std::array<char, 100>; // receiver buffer

public:
    explicit UdpReceiver()
            : work_(std::ref(resolver_context)), thread_( [this]{ resolver_context.run(); })
    { }

    ~UdpReceiver()
    {
        work_ = boost::none; // using work to keep run active always !
        thread_.join();
    }

    void async_resolve(udp::resolver::query const& query_) {
        resolver_context.post([this, query_] { do_resolve(query_); });
    }

    // callback for event-loop in main thread
    void run_handler(int fd_idx) {
        // start reading
        auto result = read(fd_idx, receive_buf.data(), sizeof(Buffer));
        // increment number of received packets
        received_packets = received_packets + 1;
        std::cout << "Received bytes " << result << " current recorded packets " << received_packets <<'\n';

        // run handler posted from resolver threads
        handler_context.poll();
        handler_context.reset();
    }

    static void handle_receive(boost::system::error_code error, udp::resolver::iterator const& iterator) {
        std::cout << "handle_resolve:\n"
                     "  " << error.message() << "\n";
        if (!error)
            std::cout << "  " << iterator->endpoint() << "\n";
    }

    // get current file descriptor
    int fd(size_t idx)
    {
        return sockets[idx].native_handle();
    }

private:

    void do_resolve(boost::asio::ip::udp::resolver::query const& query_) {

        boost::system::error_code error;
        Resolver resolver(resolver_context);
        Resolver::iterator result = resolver.resolve(query_, error);

        sockets.emplace_back(udp::socket(resolver_context, result->endpoint()));

        // post handler callback to service running in main thread
        resolver_context.post(boost::bind(&UdpReceiver::handle_receive, error, result));
    }

private:
    Sockets sockets;
    size_t received_packets = 0;
    EndPoint remote_receiver;
    Buffer receive_buf {};

    io_context resolver_context;
    io_context handler_context;
    boost::optional<boost::asio::io_context::work> work_;
    std::thread thread_;
};

int main (int argc, char** argv)
{
    UdpReceiver udpReceiver;
    udpReceiver.async_resolve(udp::resolver::query(ip_address, std::to_string(1234)));

    //logic
    pollfd fds[2] { };
    for(int i = 0; i < arraySize(fds); ++i)
    {
        fds[i].fd = udpReceiver.fd(0);
        fds[i].events = 0;
        fds[i].events |= POLLIN;
        fcntl(fds[i].fd, F_SETFL, O_NONBLOCK);
    }

    // simple event-loop
    while (true) {
        if (poll(fds, arraySize(fds), -1)) // waiting for wakeup call. Timeout - inf
        {
            for(auto &fd : fds)
            {
                if(fd.revents & POLLIN) // checking if we have something to read
                {
                    fd.revents = 0; // reset kernel message
                    udpReceiver.run_handler(fd.fd); // call resolve handler. Do read !
                }
            }
        }
    }
    return 0;
}

这看起来像是 C 风格 poll 代码和 Asio 代码的混淆。重点是

  • 你不需要轮询(Asio 在内部进行(或 epoll/select/kqueue/IOCP - 任何可用的)
  • UDP 是无连接的,因此您不需要一个以上的套接字来接收所有“连接”(发送方)

我会在单个线程上用单个 udp::socket 替换它。您甚至不必管理 thread/work:

net::thread_pool io(1); // single threaded
udp::socket s{io, {{}, 1234}};

让我们运行一个5秒的异步接收循环:

std::array<char, 100> receive_buffer;
udp::endpoint sender;

std::function<void(error_code, size_t)> read_loop;
read_loop = [&](error_code ec, size_t bytes) {
    if (bytes != size_t(-1)) {
        //std::cout << "read_loop (" << ec.message() << ")\n";
        if (ec)
            return;

        received_packets += 1;
        unique_senders.insert(sender);
        //std::cout << "Received:" << bytes << " sender:" << sender << " recorded:" << received_packets << "\n";
        //std::cout << std::string_view(receive_buffer.data(), bytes) << "\n";
    }
    s.async_receive_from(net::buffer(receive_buffer), sender, read_loop);
};

read_loop(error_code{}, -1); // prime the async pump

// after 5s stop
std::this_thread::sleep_for(5s);
post(io, [&s] { s.cancel(); });

io.join();

最后,我们可以上报统计:

std::cout << "A total of " << received_packets << " were received from "
          << unique_senders.size() << " unique senders\n";

bash 中使用模拟负载:

function client() { while read a; do echo "$a" > /dev/udp/localhost/1234 ; done < /etc/dictionaries-common/words; }

for a in {1..20}; do client& done; time wait

我们得到:

A total of 294808 were received from 28215 unique senders

real    0m5,007s
user    0m0,801s
sys     0m0,830s

这显然没有优化,这里的瓶颈可能是 很多很多 bash 子外壳正在为客户端启动。

完整列表

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <set>

namespace net = boost::asio;
using boost::asio::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;

int main ()
{
    net::thread_pool io(1); // single threaded
    udp::socket s{io, {{}, 1234}};

    std::set<udp::endpoint> unique_senders;
    size_t                  received_packets = 0;

    {
        std::array<char, 100> receive_buffer;
        udp::endpoint sender;

        std::function<void(error_code, size_t)> read_loop;
        read_loop = [&](error_code ec, size_t bytes) {
            if (bytes != size_t(-1)) {
                //std::cout << "read_loop (" << ec.message() << ")\n";
                if (ec)
                    return;

                received_packets += 1;
                unique_senders.insert(sender);
                //std::cout << "Received:" << bytes << " sender:" << sender << " recorded:" << received_packets << "\n";
                //std::cout << std::string_view(receive_buffer.data(), bytes) << "\n";
            }
            s.async_receive_from(net::buffer(receive_buffer), sender, read_loop);
        };

        read_loop(error_code{}, -1); // prime the async pump

        // after 5s stop
        std::this_thread::sleep_for(5s);
        post(io, [&s] { s.cancel(); });

        io.join();
    }

    std::cout << "A total of " << received_packets << " were received from "
              << unique_senders.size() << " unique senders\n";
}