Boost Asio 和 Udp Poll() 没有传入数据
Boost Asio and Udp Poll() No incoming data
我必须以每秒 100 毫秒的速度并行处理来自 100 个端口的信息。
我正在使用 Ubuntu OS.
我做了一些研究,发现 poll()
函数是一个很好的候选者,以避免打开 100 个线程来处理来自 udp 协议的并行数据。
我用 boost 完成了主要部分,我尝试将 poll() 与 boost 集成。
问题是当我尝试通过客户端向服务器发送数据时,我什么也没收到。
根据 wireshark,数据正在正确的主机上。 (本地主机,端口 1234)
我是不是遗漏了什么,还是我写错了什么?
测试代码(服务器) :
#include <deque>
#include <iostream>
#include <chrono>
#include <thread>
#include <sys/poll.h>
#include <boost/optional.hpp>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
using boost::asio::ip::udp;
using namespace boost::asio;
using namespace std::chrono_literals;
std::string ip_address = "127.0.0.1";
template<typename T, size_t N>
size_t arraySize( T(&)[N] )
{
return(N);
}
class UdpReceiver
{
using Resolver = udp::resolver;
using Sockets = std::deque<udp::socket>;
using EndPoint = udp::endpoint;
using Buffer = std::array<char, 100>; // receiver buffer
public:
explicit UdpReceiver()
: work_(std::ref(resolver_context)), thread_( [this]{ resolver_context.run(); })
{ }
~UdpReceiver()
{
work_ = boost::none; // using work to keep run active always !
thread_.join();
}
void async_resolve(udp::resolver::query const& query_) {
resolver_context.post([this, query_] { do_resolve(query_); });
}
// callback for event-loop in main thread
void run_handler(int fd_idx) {
// start reading
auto result = read(fd_idx, receive_buf.data(), sizeof(Buffer));
// increment number of received packets
received_packets = received_packets + 1;
std::cout << "Received bytes " << result << " current recorded packets " << received_packets <<'\n';
// run handler posted from resolver threads
handler_context.poll();
handler_context.reset();
}
static void handle_receive(boost::system::error_code error, udp::resolver::iterator const& iterator) {
std::cout << "handle_resolve:\n"
" " << error.message() << "\n";
if (!error)
std::cout << " " << iterator->endpoint() << "\n";
}
// get current file descriptor
int fd(size_t idx)
{
return sockets[idx].native_handle();
}
private:
void do_resolve(boost::asio::ip::udp::resolver::query const& query_) {
boost::system::error_code error;
Resolver resolver(resolver_context);
Resolver::iterator result = resolver.resolve(query_, error);
sockets.emplace_back(udp::socket(resolver_context, result->endpoint()));
// post handler callback to service running in main thread
resolver_context.post(boost::bind(&UdpReceiver::handle_receive, error, result));
}
private:
Sockets sockets;
size_t received_packets = 0;
EndPoint remote_receiver;
Buffer receive_buf {};
io_context resolver_context;
io_context handler_context;
boost::optional<boost::asio::io_context::work> work_;
std::thread thread_;
};
int main (int argc, char** argv)
{
UdpReceiver udpReceiver;
udpReceiver.async_resolve(udp::resolver::query(ip_address, std::to_string(1234)));
//logic
pollfd fds[2] { };
for(int i = 0; i < arraySize(fds); ++i)
{
fds[i].fd = udpReceiver.fd(0);
fds[i].events = 0;
fds[i].events |= POLLIN;
fcntl(fds[i].fd, F_SETFL, O_NONBLOCK);
}
// simple event-loop
while (true) {
if (poll(fds, arraySize(fds), -1)) // waiting for wakeup call. Timeout - inf
{
for(auto &fd : fds)
{
if(fd.revents & POLLIN) // checking if we have something to read
{
fd.revents = 0; // reset kernel message
udpReceiver.run_handler(fd.fd); // call resolve handler. Do read !
}
}
}
}
return 0;
}
这看起来像是 C 风格 poll
代码和 Asio 代码的混淆。重点是
- 你不需要轮询(Asio 在内部进行(或 epoll/select/kqueue/IOCP - 任何可用的)
- UDP 是无连接的,因此您不需要一个以上的套接字来接收所有“连接”(发送方)
我会在单个线程上用单个 udp::socket
替换它。您甚至不必管理 thread/work:
net::thread_pool io(1); // single threaded
udp::socket s{io, {{}, 1234}};
让我们运行一个5秒的异步接收循环:
std::array<char, 100> receive_buffer;
udp::endpoint sender;
std::function<void(error_code, size_t)> read_loop;
read_loop = [&](error_code ec, size_t bytes) {
if (bytes != size_t(-1)) {
//std::cout << "read_loop (" << ec.message() << ")\n";
if (ec)
return;
received_packets += 1;
unique_senders.insert(sender);
//std::cout << "Received:" << bytes << " sender:" << sender << " recorded:" << received_packets << "\n";
//std::cout << std::string_view(receive_buffer.data(), bytes) << "\n";
}
s.async_receive_from(net::buffer(receive_buffer), sender, read_loop);
};
read_loop(error_code{}, -1); // prime the async pump
// after 5s stop
std::this_thread::sleep_for(5s);
post(io, [&s] { s.cancel(); });
io.join();
最后,我们可以上报统计:
std::cout << "A total of " << received_packets << " were received from "
<< unique_senders.size() << " unique senders\n";
在 bash
中使用模拟负载:
function client() { while read a; do echo "$a" > /dev/udp/localhost/1234 ; done < /etc/dictionaries-common/words; }
for a in {1..20}; do client& done; time wait
我们得到:
A total of 294808 were received from 28215 unique senders
real 0m5,007s
user 0m0,801s
sys 0m0,830s
这显然没有优化,这里的瓶颈可能是 很多很多 bash 子外壳正在为客户端启动。
完整列表
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <set>
namespace net = boost::asio;
using boost::asio::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
int main ()
{
net::thread_pool io(1); // single threaded
udp::socket s{io, {{}, 1234}};
std::set<udp::endpoint> unique_senders;
size_t received_packets = 0;
{
std::array<char, 100> receive_buffer;
udp::endpoint sender;
std::function<void(error_code, size_t)> read_loop;
read_loop = [&](error_code ec, size_t bytes) {
if (bytes != size_t(-1)) {
//std::cout << "read_loop (" << ec.message() << ")\n";
if (ec)
return;
received_packets += 1;
unique_senders.insert(sender);
//std::cout << "Received:" << bytes << " sender:" << sender << " recorded:" << received_packets << "\n";
//std::cout << std::string_view(receive_buffer.data(), bytes) << "\n";
}
s.async_receive_from(net::buffer(receive_buffer), sender, read_loop);
};
read_loop(error_code{}, -1); // prime the async pump
// after 5s stop
std::this_thread::sleep_for(5s);
post(io, [&s] { s.cancel(); });
io.join();
}
std::cout << "A total of " << received_packets << " were received from "
<< unique_senders.size() << " unique senders\n";
}
我必须以每秒 100 毫秒的速度并行处理来自 100 个端口的信息。
我正在使用 Ubuntu OS.
我做了一些研究,发现 poll()
函数是一个很好的候选者,以避免打开 100 个线程来处理来自 udp 协议的并行数据。
我用 boost 完成了主要部分,我尝试将 poll() 与 boost 集成。
问题是当我尝试通过客户端向服务器发送数据时,我什么也没收到。 根据 wireshark,数据正在正确的主机上。 (本地主机,端口 1234)
我是不是遗漏了什么,还是我写错了什么?
测试代码(服务器) :
#include <deque>
#include <iostream>
#include <chrono>
#include <thread>
#include <sys/poll.h>
#include <boost/optional.hpp>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
using boost::asio::ip::udp;
using namespace boost::asio;
using namespace std::chrono_literals;
std::string ip_address = "127.0.0.1";
template<typename T, size_t N>
size_t arraySize( T(&)[N] )
{
return(N);
}
class UdpReceiver
{
using Resolver = udp::resolver;
using Sockets = std::deque<udp::socket>;
using EndPoint = udp::endpoint;
using Buffer = std::array<char, 100>; // receiver buffer
public:
explicit UdpReceiver()
: work_(std::ref(resolver_context)), thread_( [this]{ resolver_context.run(); })
{ }
~UdpReceiver()
{
work_ = boost::none; // using work to keep run active always !
thread_.join();
}
void async_resolve(udp::resolver::query const& query_) {
resolver_context.post([this, query_] { do_resolve(query_); });
}
// callback for event-loop in main thread
void run_handler(int fd_idx) {
// start reading
auto result = read(fd_idx, receive_buf.data(), sizeof(Buffer));
// increment number of received packets
received_packets = received_packets + 1;
std::cout << "Received bytes " << result << " current recorded packets " << received_packets <<'\n';
// run handler posted from resolver threads
handler_context.poll();
handler_context.reset();
}
static void handle_receive(boost::system::error_code error, udp::resolver::iterator const& iterator) {
std::cout << "handle_resolve:\n"
" " << error.message() << "\n";
if (!error)
std::cout << " " << iterator->endpoint() << "\n";
}
// get current file descriptor
int fd(size_t idx)
{
return sockets[idx].native_handle();
}
private:
void do_resolve(boost::asio::ip::udp::resolver::query const& query_) {
boost::system::error_code error;
Resolver resolver(resolver_context);
Resolver::iterator result = resolver.resolve(query_, error);
sockets.emplace_back(udp::socket(resolver_context, result->endpoint()));
// post handler callback to service running in main thread
resolver_context.post(boost::bind(&UdpReceiver::handle_receive, error, result));
}
private:
Sockets sockets;
size_t received_packets = 0;
EndPoint remote_receiver;
Buffer receive_buf {};
io_context resolver_context;
io_context handler_context;
boost::optional<boost::asio::io_context::work> work_;
std::thread thread_;
};
int main (int argc, char** argv)
{
UdpReceiver udpReceiver;
udpReceiver.async_resolve(udp::resolver::query(ip_address, std::to_string(1234)));
//logic
pollfd fds[2] { };
for(int i = 0; i < arraySize(fds); ++i)
{
fds[i].fd = udpReceiver.fd(0);
fds[i].events = 0;
fds[i].events |= POLLIN;
fcntl(fds[i].fd, F_SETFL, O_NONBLOCK);
}
// simple event-loop
while (true) {
if (poll(fds, arraySize(fds), -1)) // waiting for wakeup call. Timeout - inf
{
for(auto &fd : fds)
{
if(fd.revents & POLLIN) // checking if we have something to read
{
fd.revents = 0; // reset kernel message
udpReceiver.run_handler(fd.fd); // call resolve handler. Do read !
}
}
}
}
return 0;
}
这看起来像是 C 风格 poll
代码和 Asio 代码的混淆。重点是
- 你不需要轮询(Asio 在内部进行(或 epoll/select/kqueue/IOCP - 任何可用的)
- UDP 是无连接的,因此您不需要一个以上的套接字来接收所有“连接”(发送方)
我会在单个线程上用单个 udp::socket
替换它。您甚至不必管理 thread/work:
net::thread_pool io(1); // single threaded
udp::socket s{io, {{}, 1234}};
让我们运行一个5秒的异步接收循环:
std::array<char, 100> receive_buffer;
udp::endpoint sender;
std::function<void(error_code, size_t)> read_loop;
read_loop = [&](error_code ec, size_t bytes) {
if (bytes != size_t(-1)) {
//std::cout << "read_loop (" << ec.message() << ")\n";
if (ec)
return;
received_packets += 1;
unique_senders.insert(sender);
//std::cout << "Received:" << bytes << " sender:" << sender << " recorded:" << received_packets << "\n";
//std::cout << std::string_view(receive_buffer.data(), bytes) << "\n";
}
s.async_receive_from(net::buffer(receive_buffer), sender, read_loop);
};
read_loop(error_code{}, -1); // prime the async pump
// after 5s stop
std::this_thread::sleep_for(5s);
post(io, [&s] { s.cancel(); });
io.join();
最后,我们可以上报统计:
std::cout << "A total of " << received_packets << " were received from "
<< unique_senders.size() << " unique senders\n";
在 bash
中使用模拟负载:
function client() { while read a; do echo "$a" > /dev/udp/localhost/1234 ; done < /etc/dictionaries-common/words; }
for a in {1..20}; do client& done; time wait
我们得到:
A total of 294808 were received from 28215 unique senders
real 0m5,007s
user 0m0,801s
sys 0m0,830s
这显然没有优化,这里的瓶颈可能是 很多很多 bash 子外壳正在为客户端启动。
完整列表
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <iostream>
#include <set>
namespace net = boost::asio;
using boost::asio::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
int main ()
{
net::thread_pool io(1); // single threaded
udp::socket s{io, {{}, 1234}};
std::set<udp::endpoint> unique_senders;
size_t received_packets = 0;
{
std::array<char, 100> receive_buffer;
udp::endpoint sender;
std::function<void(error_code, size_t)> read_loop;
read_loop = [&](error_code ec, size_t bytes) {
if (bytes != size_t(-1)) {
//std::cout << "read_loop (" << ec.message() << ")\n";
if (ec)
return;
received_packets += 1;
unique_senders.insert(sender);
//std::cout << "Received:" << bytes << " sender:" << sender << " recorded:" << received_packets << "\n";
//std::cout << std::string_view(receive_buffer.data(), bytes) << "\n";
}
s.async_receive_from(net::buffer(receive_buffer), sender, read_loop);
};
read_loop(error_code{}, -1); // prime the async pump
// after 5s stop
std::this_thread::sleep_for(5s);
post(io, [&s] { s.cancel(); });
io.join();
}
std::cout << "A total of " << received_packets << " were received from "
<< unique_senders.size() << " unique senders\n";
}