C++ UDP 服务器 io_context 运行 在线程退出之前可以开始工作
C++ UDP Server io_context running in thread exits before work can start
我是 C++ 的新手,但到目前为止,大多数 asio 的东西都是有意义的。然而,我正在努力让我的 UDPServer 工作。
我的问题可能类似于:
我想我的 UDPServer 在工作可以交给它 io_context 之前就停止了。但是,我在调用 io_context.run() 之前向上下文发布工作,所以我不明白为什么。
当然,我不完全确定我的上述陈述是否正确,希望得到一些指导。这是我的 class:
template<typename message_T>
class UDPServer
{
public:
UDPServer(uint16_t port)
: m_socket(m_asioContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), port))
{
m_port = port;
}
virtual ~UDPServer()
{
Stop();
}
public:
// Starts the server!
bool Start()
{
try
{
// Issue a task to the asio context
WaitForMessages();
m_threadContext = std::thread([this]() { m_asioContext.run(); });
}
catch (std::exception& e)
{
// Something prohibited the server from listening
std::cerr << "[SERVER @ PORT " << m_port << "] Exception: " << e.what() << "\n";
return false;
}
std::cout << "[SERVER @ PORT " << m_port << "] Started!\n";
return true;
}
// Stops the server!
void Stop()
{
// Request the context to close
m_asioContext.stop();
// Tidy up the context thread
if (m_threadContext.joinable()) m_threadContext.join();
// Inform someone, anybody, if they care...
std::cout << "[SERVER @ PORT " << m_port << "] Stopped!\n";
}
void WaitForMessages()
{
m_socket.async_receive_from(asio::buffer(vBuffer.data(), vBuffer.size()), m_endpoint,
[this](std::error_code ec, std::size_t length)
{
if (!ec)
{
std::cout << "[SERVER @ PORT " << m_port << "] Got " << length << " bytes \n Data: " << vBuffer.data() << "\n" << "Address: " << m_endpoint.address() << " Port: " << m_endpoint.port() << "\n" << "Data: " << m_endpoint.data() << "\n";
}
else
{
std::cerr << "[SERVER @ PORT " << m_port << "] Exception: " << ec.message() << "\n";
return;
}
WaitForMessages();
}
);
}
void Send(message_T& msg, const asio::ip::udp::endpoint& ep)
{
asio::post(m_asioContext,
[this, msg, ep]()
{
// If the queue has a message in it, then we must
// assume that it is in the process of asynchronously being written.
bool bWritingMessage = !m_messagesOut.empty();
m_messagesOut.push_back(msg);
if (!bWritingMessage)
{
WriteMessage(ep);
}
}
);
}
private:
void WriteMessage(const asio::ip::udp::endpoint& ep)
{
m_socket.async_send_to(asio::buffer(&m_messagesOut.front(), sizeof(message_T)), ep,
[this, ep](std::error_code ec, std::size_t length)
{
if (!ec)
{
m_messagesOut.pop_front();
// If the queue is not empty, there are more messages to send, so
// make this happen by issuing the task to send the next header.
if (!m_messagesOut.empty())
{
WriteMessage(ep);
}
}
else
{
std::cout << "[SERVER @ PORT " << m_port << "] Write Header Fail.\n";
m_socket.close();
}
});
}
void ReadMessage()
{
}
private:
uint16_t m_port = 0;
asio::ip::udp::endpoint m_endpoint;
std::vector<char> vBuffer = std::vector<char>(21);
protected:
TSQueue<message_T> m_messagesIn;
TSQueue<message_T> m_messagesOut;
Message<message_T> m_tempMessageBuf;
asio::io_context m_asioContext;
std::thread m_threadContext;
asio::ip::udp::socket m_socket;
};
}
暂时在主函数中调用代码:
enum class TestMsg {
Ping,
Join,
Leave
};
int main() {
Message<TestMsg> msg; // Message is a pretty basic struct that I'm not using yet. When I was, I was only receiving the first 4 bytes - which led me down this path of investigation
msg.id = TestMsg::Join;
msg << "hello";
UDPServer<Message<TestMsg>> server(60000);
}
调用时,服务器会在有机会打印“[SERVER] 已启动”之前立即退出
我会尝试按照 link post 的描述添加工作警卫,但我仍然想了解为什么 io_context 没有足够快地准备好工作。
Update(现在我也看了题而不仅仅是代码)
在 WaitForMessages
中,您确实通过调用 m_socket.async_receive_from
函数开始监听,因为它是异步的,该函数将在设置监听后立即 return/unblock 。因此,只要您实际上没有客户端向您发送内容,您的服务器就无能为力。只有当它收到一些东西时,回调才会被调用 io_context::run
的线程调用。所以你需要工作守卫,这样你的线程 运行ning run
不会在启动后立即解除阻塞,但只要工作守卫在那里就会阻塞。
如果在处理程序中抛出异常并且您仍想继续使用服务器,通常它还会与 try/while 模式结合使用。
同样在您发布的代码中,您实际上从未调用过 UDPServer::Start
!
这是我对答案的第一个想法:
This is normal behavior of ASIO. io_context::run
函数将在没有工作要做时立即 return。
所以要改变 run
函数的行为来阻止你必须使用 boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
即所谓的工作守卫。使用对 io_context
的引用构造该对象并保存它,即不要让它破坏,只要你想让服务器 运行,即不想让 io_context::run
return没有工作的时候。
如此给出
boost::asio::io_context io_context_;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard_;
然后你可以调用
work_guard_{boost::asio::make_work_guard(io_context_)},
const auto thread_count{std::max<unsigned>(std::thread::hardware_concurrency(), 1)};
std::generate_n(std::back_inserter(this->io_run_threads_),
thread_count,
[this]() {
return std::thread{io_run_loop,
std::ref(this->io_context_), std::ref(this->error_handler_)};
});
void io_run_loop(boost::asio::io_context &context,
const std::function<void(std::exception &)> &error_handler) {
while (true) {
try {
context.run();
break;
} catch (std::exception &e) {
error_handler(e);
}
}
}
然后关闭服务器:
work_guard_.reset();
io_context_.stop();
std::for_each(this->io_run_threads_.begin(), this->io_run_threads_.end(), [](auto &thread) {
if (thread.joinable()) thread.join();
});
为了更优雅地关闭,您可以省略 stop
调用,而是先关闭所有套接字。
你好像忘了打电话给 server.Start();
。此外,您需要让主线程等待一段时间,否则 Server
的析构函数将立即导致 Stop()
被调用:
int main()
{
Message<TestMsg> msg;
msg.id = TestMsg::Join;
msg << "hello";
UDPServer<Message<TestMsg>> server(60000);
server.Start();
std::this_thread::sleep_for(30s);
}
问题
Send
API 存在概念性问题。
它在每次调用时都需要一个端点,但它只使用启动写入调用链的端点!这意味着如果你这样做
srv.Send(msg1, {mymachine, 60001});
srv.Send(msg1, {otherserver, 5517});
很可能他们都被发送到 mymachine:60001。
您如何处理收到的缓冲区。只是盲目地使用 .data()
假设数据是 NUL 终止的。不要那样做:
std::string const data(vBuffer.data(), length);
此外,您似乎有时对数据和打印感到困惑 m_endpoint.data()
- 您的公主在另一座城堡中。
实际上,您可能想要提取键入数据的方法。我将其保留在今天这个问题的范围之外。
无论如何,您应该在重新使用之前清除缓冲区,因为您可能会在后续读取中看到旧数据。
vBuffer.assign(vBuffer.size(), '[=13=]');
这很有可能undefined behaviour:
asio::buffer(&m_messagesOut.front(), sizeof(message_T)), ep,
这仅在 message_T
是微不足道的标准布局(“POD”- 普通旧数据)时才有效。 operator<<
的存在强烈表明情况并非如此。
相反,构建一个(序列)缓冲区,将消息表示为原始字节,例如
auto& msg = m_messagesOut.front();
msg.length = msg.body.size();
m_socket.async_send_to(
std::vector<asio::const_buffer>{
asio::buffer(&msg.id, sizeof(msg.id)),
asio::buffer(&msg.length, sizeof(msg.length)),
asio::buffer(msg.body),
},
// ...
线程安全队列似乎有点矫枉过正,因为您只有一个服务线程;这是一个隐含的“链”,因此您可以 post 它具有单线程语义。
这里有一些改编以使其到目前为止有效(除了 reader 指出的练习):
#include <boost/asio.hpp>
#include <iostream>
#include <deque>
#include <sstream>
// Library facilities
namespace asio = boost::asio;
using asio::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
/////////////////////////////////
// mock ups:
template <typename message_T> struct Message {
message_T id;
uint16_t length; // automatically filled on send, UDP packets are < 64k
std::string body;
template <typename T> friend Message& operator<<(Message& m, T const& v)
{
std::ostringstream oss;
oss << v;
m.body += oss.str();
//m.body += '[=16=]'; // suggestion for easier message extraction
return m;
}
};
// Thread-safety can be replaced with the implicit strand of a single service
// thread
template <typename T> using TSQueue = std::deque<T>;
// end mock ups
/////////////////////////////////
template <typename message_T> class UDPServer {
public:
UDPServer(uint16_t port)
: m_socket(m_asioContext, udp::endpoint(udp::v4(), port))
{
m_port = port;
}
virtual ~UDPServer() { Stop(); }
public:
// Starts the server!
bool Start()
{
if (m_threadContext.joinable() && !m_asioContext.stopped())
return false;
try {
// Issue a task to the asio context
WaitForMessages();
m_threadContext = std::thread([this]() { m_asioContext.run(); });
} catch (std::exception const& e) {
// Something prohibited the server from listening
std::cerr << "[SERVER @ PORT " << m_port
<< "] Exception: " << e.what() << "\n";
return false;
}
std::cout << "[SERVER @ PORT " << m_port << "] Started!\n";
return true;
}
// Stops the server!
void Stop()
{
// Tell the context to stop processing
m_asioContext.stop();
// Tidy up the context thread
if (m_threadContext.joinable())
m_threadContext.join();
// Inform someone, anybody, if they care...
std::cout << "[SERVER @ PORT " << m_port << "] Stopped!\n";
m_asioContext
.reset(); // required in case you want to reuse this Server object
}
void Send(message_T& msg, const udp::endpoint& ep)
{
asio::post(m_asioContext, [this, msg, ep]() {
// If the queue has a message in it, then we must
// assume that it is in the process of asynchronously being written.
bool bWritingMessage = !m_messagesOut.empty();
m_messagesOut.push_back(msg);
if (!bWritingMessage) {
WriteMessage(ep);
}
});
}
private:
void WaitForMessages() // assumed to be on-strand
{
vBuffer.assign(vBuffer.size(), '[=16=]');
m_socket.async_receive_from(
asio::buffer(vBuffer.data(), vBuffer.size()), m_endpoint,
[this](std::error_code ec, std::size_t length) {
if (!ec) {
std::string const data(vBuffer.data(), length);
std::cout << "[SERVER @ PORT " << m_port << "] Got "
<< length << " bytes \n Data: " << data << "\n"
<< "Address: " << m_endpoint.address()
<< " Port: " << m_endpoint.port() << "\n"
<< std::endl;
} else {
std::cerr << "[SERVER @ PORT " << m_port
<< "] Exception: " << ec.message() << "\n";
return;
}
WaitForMessages();
});
}
void WriteMessage(const udp::endpoint& ep)
{
auto& msg = m_messagesOut.front();
msg.length = msg.body.size();
m_socket.async_send_to(
std::vector<asio::const_buffer>{
asio::buffer(&msg.id, sizeof(msg.id)),
asio::buffer(&msg.length, sizeof(msg.length)),
asio::buffer(msg.body),
},
ep, [this, ep](std::error_code ec, std::size_t length) {
if (!ec) {
m_messagesOut.pop_front();
// If the queue is not empty, there are more messages to
// send, so make this happen by issuing the task to send the
// next header.
if (!m_messagesOut.empty()) {
WriteMessage(ep);
}
} else {
std::cout << "[SERVER @ PORT " << m_port
<< "] Write Header Fail.\n";
m_socket.close();
}
});
}
private:
uint16_t m_port = 0;
udp::endpoint m_endpoint;
std::vector<char> vBuffer = std::vector<char>(21);
protected:
TSQueue<message_T> m_messagesIn;
TSQueue<message_T> m_messagesOut;
Message<message_T> m_tempMessageBuf;
asio::io_context m_asioContext;
std::thread m_threadContext;
udp::socket m_socket;
};
enum class TestMsg {
Ping,
Join,
Leave
};
int main()
{
UDPServer<Message<TestMsg>> server(60'000);
if (server.Start()) {
std::this_thread::sleep_for(3s);
{
Message<TestMsg> msg;
msg.id = TestMsg::Join;
msg << "hello PI equals " << M_PI << " in this world";
server.Send(msg, {{}, 60'001});
}
std::this_thread::sleep_for(27s);
}
}
出于某种原因,netcat 在 Coliru 上不能与 UDP 一起使用,所以这里有一个“现场”演示:
您可以看到我们的 netcat 客户端消息到达。您可以在 tcpdump 输出中看到发送到 60001 的消息。
我是 C++ 的新手,但到目前为止,大多数 asio 的东西都是有意义的。然而,我正在努力让我的 UDPServer 工作。
我的问题可能类似于:
我想我的 UDPServer 在工作可以交给它 io_context 之前就停止了。但是,我在调用 io_context.run() 之前向上下文发布工作,所以我不明白为什么。
当然,我不完全确定我的上述陈述是否正确,希望得到一些指导。这是我的 class:
template<typename message_T>
class UDPServer
{
public:
UDPServer(uint16_t port)
: m_socket(m_asioContext, asio::ip::udp::endpoint(asio::ip::udp::v4(), port))
{
m_port = port;
}
virtual ~UDPServer()
{
Stop();
}
public:
// Starts the server!
bool Start()
{
try
{
// Issue a task to the asio context
WaitForMessages();
m_threadContext = std::thread([this]() { m_asioContext.run(); });
}
catch (std::exception& e)
{
// Something prohibited the server from listening
std::cerr << "[SERVER @ PORT " << m_port << "] Exception: " << e.what() << "\n";
return false;
}
std::cout << "[SERVER @ PORT " << m_port << "] Started!\n";
return true;
}
// Stops the server!
void Stop()
{
// Request the context to close
m_asioContext.stop();
// Tidy up the context thread
if (m_threadContext.joinable()) m_threadContext.join();
// Inform someone, anybody, if they care...
std::cout << "[SERVER @ PORT " << m_port << "] Stopped!\n";
}
void WaitForMessages()
{
m_socket.async_receive_from(asio::buffer(vBuffer.data(), vBuffer.size()), m_endpoint,
[this](std::error_code ec, std::size_t length)
{
if (!ec)
{
std::cout << "[SERVER @ PORT " << m_port << "] Got " << length << " bytes \n Data: " << vBuffer.data() << "\n" << "Address: " << m_endpoint.address() << " Port: " << m_endpoint.port() << "\n" << "Data: " << m_endpoint.data() << "\n";
}
else
{
std::cerr << "[SERVER @ PORT " << m_port << "] Exception: " << ec.message() << "\n";
return;
}
WaitForMessages();
}
);
}
void Send(message_T& msg, const asio::ip::udp::endpoint& ep)
{
asio::post(m_asioContext,
[this, msg, ep]()
{
// If the queue has a message in it, then we must
// assume that it is in the process of asynchronously being written.
bool bWritingMessage = !m_messagesOut.empty();
m_messagesOut.push_back(msg);
if (!bWritingMessage)
{
WriteMessage(ep);
}
}
);
}
private:
void WriteMessage(const asio::ip::udp::endpoint& ep)
{
m_socket.async_send_to(asio::buffer(&m_messagesOut.front(), sizeof(message_T)), ep,
[this, ep](std::error_code ec, std::size_t length)
{
if (!ec)
{
m_messagesOut.pop_front();
// If the queue is not empty, there are more messages to send, so
// make this happen by issuing the task to send the next header.
if (!m_messagesOut.empty())
{
WriteMessage(ep);
}
}
else
{
std::cout << "[SERVER @ PORT " << m_port << "] Write Header Fail.\n";
m_socket.close();
}
});
}
void ReadMessage()
{
}
private:
uint16_t m_port = 0;
asio::ip::udp::endpoint m_endpoint;
std::vector<char> vBuffer = std::vector<char>(21);
protected:
TSQueue<message_T> m_messagesIn;
TSQueue<message_T> m_messagesOut;
Message<message_T> m_tempMessageBuf;
asio::io_context m_asioContext;
std::thread m_threadContext;
asio::ip::udp::socket m_socket;
};
}
暂时在主函数中调用代码:
enum class TestMsg {
Ping,
Join,
Leave
};
int main() {
Message<TestMsg> msg; // Message is a pretty basic struct that I'm not using yet. When I was, I was only receiving the first 4 bytes - which led me down this path of investigation
msg.id = TestMsg::Join;
msg << "hello";
UDPServer<Message<TestMsg>> server(60000);
}
调用时,服务器会在有机会打印“[SERVER] 已启动”之前立即退出
我会尝试按照 link post 的描述添加工作警卫,但我仍然想了解为什么 io_context 没有足够快地准备好工作。
Update(现在我也看了题而不仅仅是代码)
在 WaitForMessages
中,您确实通过调用 m_socket.async_receive_from
函数开始监听,因为它是异步的,该函数将在设置监听后立即 return/unblock 。因此,只要您实际上没有客户端向您发送内容,您的服务器就无能为力。只有当它收到一些东西时,回调才会被调用 io_context::run
的线程调用。所以你需要工作守卫,这样你的线程 运行ning run
不会在启动后立即解除阻塞,但只要工作守卫在那里就会阻塞。
如果在处理程序中抛出异常并且您仍想继续使用服务器,通常它还会与 try/while 模式结合使用。
同样在您发布的代码中,您实际上从未调用过 UDPServer::Start
!
这是我对答案的第一个想法:
This is normal behavior of ASIO. io_context::run
函数将在没有工作要做时立即 return。
所以要改变 run
函数的行为来阻止你必须使用 boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
即所谓的工作守卫。使用对 io_context
的引用构造该对象并保存它,即不要让它破坏,只要你想让服务器 运行,即不想让 io_context::run
return没有工作的时候。
如此给出
boost::asio::io_context io_context_;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard_;
然后你可以调用
work_guard_{boost::asio::make_work_guard(io_context_)},
const auto thread_count{std::max<unsigned>(std::thread::hardware_concurrency(), 1)};
std::generate_n(std::back_inserter(this->io_run_threads_),
thread_count,
[this]() {
return std::thread{io_run_loop,
std::ref(this->io_context_), std::ref(this->error_handler_)};
});
void io_run_loop(boost::asio::io_context &context,
const std::function<void(std::exception &)> &error_handler) {
while (true) {
try {
context.run();
break;
} catch (std::exception &e) {
error_handler(e);
}
}
}
然后关闭服务器:
work_guard_.reset();
io_context_.stop();
std::for_each(this->io_run_threads_.begin(), this->io_run_threads_.end(), [](auto &thread) {
if (thread.joinable()) thread.join();
});
为了更优雅地关闭,您可以省略 stop
调用,而是先关闭所有套接字。
你好像忘了打电话给 server.Start();
。此外,您需要让主线程等待一段时间,否则 Server
的析构函数将立即导致 Stop()
被调用:
int main()
{
Message<TestMsg> msg;
msg.id = TestMsg::Join;
msg << "hello";
UDPServer<Message<TestMsg>> server(60000);
server.Start();
std::this_thread::sleep_for(30s);
}
问题
Send
API 存在概念性问题。 它在每次调用时都需要一个端点,但它只使用启动写入调用链的端点!这意味着如果你这样做srv.Send(msg1, {mymachine, 60001}); srv.Send(msg1, {otherserver, 5517});
很可能他们都被发送到 mymachine:60001。
您如何处理收到的缓冲区。只是盲目地使用
.data()
假设数据是 NUL 终止的。不要那样做:std::string const data(vBuffer.data(), length);
此外,您似乎有时对数据和打印感到困惑
m_endpoint.data()
- 您的公主在另一座城堡中。实际上,您可能想要提取键入数据的方法。我将其保留在今天这个问题的范围之外。
无论如何,您应该在重新使用之前清除缓冲区,因为您可能会在后续读取中看到旧数据。
vBuffer.assign(vBuffer.size(), '[=13=]');
这很有可能undefined behaviour:
asio::buffer(&m_messagesOut.front(), sizeof(message_T)), ep,
这仅在
message_T
是微不足道的标准布局(“POD”- 普通旧数据)时才有效。operator<<
的存在强烈表明情况并非如此。相反,构建一个(序列)缓冲区,将消息表示为原始字节,例如
auto& msg = m_messagesOut.front(); msg.length = msg.body.size(); m_socket.async_send_to( std::vector<asio::const_buffer>{ asio::buffer(&msg.id, sizeof(msg.id)), asio::buffer(&msg.length, sizeof(msg.length)), asio::buffer(msg.body), }, // ...
线程安全队列似乎有点矫枉过正,因为您只有一个服务线程;这是一个隐含的“链”,因此您可以 post 它具有单线程语义。
这里有一些改编以使其到目前为止有效(除了 reader 指出的练习):
#include <boost/asio.hpp>
#include <iostream>
#include <deque>
#include <sstream>
// Library facilities
namespace asio = boost::asio;
using asio::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
/////////////////////////////////
// mock ups:
template <typename message_T> struct Message {
message_T id;
uint16_t length; // automatically filled on send, UDP packets are < 64k
std::string body;
template <typename T> friend Message& operator<<(Message& m, T const& v)
{
std::ostringstream oss;
oss << v;
m.body += oss.str();
//m.body += '[=16=]'; // suggestion for easier message extraction
return m;
}
};
// Thread-safety can be replaced with the implicit strand of a single service
// thread
template <typename T> using TSQueue = std::deque<T>;
// end mock ups
/////////////////////////////////
template <typename message_T> class UDPServer {
public:
UDPServer(uint16_t port)
: m_socket(m_asioContext, udp::endpoint(udp::v4(), port))
{
m_port = port;
}
virtual ~UDPServer() { Stop(); }
public:
// Starts the server!
bool Start()
{
if (m_threadContext.joinable() && !m_asioContext.stopped())
return false;
try {
// Issue a task to the asio context
WaitForMessages();
m_threadContext = std::thread([this]() { m_asioContext.run(); });
} catch (std::exception const& e) {
// Something prohibited the server from listening
std::cerr << "[SERVER @ PORT " << m_port
<< "] Exception: " << e.what() << "\n";
return false;
}
std::cout << "[SERVER @ PORT " << m_port << "] Started!\n";
return true;
}
// Stops the server!
void Stop()
{
// Tell the context to stop processing
m_asioContext.stop();
// Tidy up the context thread
if (m_threadContext.joinable())
m_threadContext.join();
// Inform someone, anybody, if they care...
std::cout << "[SERVER @ PORT " << m_port << "] Stopped!\n";
m_asioContext
.reset(); // required in case you want to reuse this Server object
}
void Send(message_T& msg, const udp::endpoint& ep)
{
asio::post(m_asioContext, [this, msg, ep]() {
// If the queue has a message in it, then we must
// assume that it is in the process of asynchronously being written.
bool bWritingMessage = !m_messagesOut.empty();
m_messagesOut.push_back(msg);
if (!bWritingMessage) {
WriteMessage(ep);
}
});
}
private:
void WaitForMessages() // assumed to be on-strand
{
vBuffer.assign(vBuffer.size(), '[=16=]');
m_socket.async_receive_from(
asio::buffer(vBuffer.data(), vBuffer.size()), m_endpoint,
[this](std::error_code ec, std::size_t length) {
if (!ec) {
std::string const data(vBuffer.data(), length);
std::cout << "[SERVER @ PORT " << m_port << "] Got "
<< length << " bytes \n Data: " << data << "\n"
<< "Address: " << m_endpoint.address()
<< " Port: " << m_endpoint.port() << "\n"
<< std::endl;
} else {
std::cerr << "[SERVER @ PORT " << m_port
<< "] Exception: " << ec.message() << "\n";
return;
}
WaitForMessages();
});
}
void WriteMessage(const udp::endpoint& ep)
{
auto& msg = m_messagesOut.front();
msg.length = msg.body.size();
m_socket.async_send_to(
std::vector<asio::const_buffer>{
asio::buffer(&msg.id, sizeof(msg.id)),
asio::buffer(&msg.length, sizeof(msg.length)),
asio::buffer(msg.body),
},
ep, [this, ep](std::error_code ec, std::size_t length) {
if (!ec) {
m_messagesOut.pop_front();
// If the queue is not empty, there are more messages to
// send, so make this happen by issuing the task to send the
// next header.
if (!m_messagesOut.empty()) {
WriteMessage(ep);
}
} else {
std::cout << "[SERVER @ PORT " << m_port
<< "] Write Header Fail.\n";
m_socket.close();
}
});
}
private:
uint16_t m_port = 0;
udp::endpoint m_endpoint;
std::vector<char> vBuffer = std::vector<char>(21);
protected:
TSQueue<message_T> m_messagesIn;
TSQueue<message_T> m_messagesOut;
Message<message_T> m_tempMessageBuf;
asio::io_context m_asioContext;
std::thread m_threadContext;
udp::socket m_socket;
};
enum class TestMsg {
Ping,
Join,
Leave
};
int main()
{
UDPServer<Message<TestMsg>> server(60'000);
if (server.Start()) {
std::this_thread::sleep_for(3s);
{
Message<TestMsg> msg;
msg.id = TestMsg::Join;
msg << "hello PI equals " << M_PI << " in this world";
server.Send(msg, {{}, 60'001});
}
std::this_thread::sleep_for(27s);
}
}
出于某种原因,netcat 在 Coliru 上不能与 UDP 一起使用,所以这里有一个“现场”演示:
您可以看到我们的 netcat 客户端消息到达。您可以在 tcpdump 输出中看到发送到 60001 的消息。