Asio 点对点网络编程
Asio Peer to Peer Network programming
我正在研究套接字的 Asio 文档,但找不到任何关于如何处理以下情况的有用信息:
我假设在对等网络中有很多服务器(最多 1000 个)。
服务器必须定期相互通信,所以我不想在每次需要时都打开一个新的客户端连接来向另一台服务器发送消息(巨大的开销)。
同时,创建 n 个线程,每个线程对应一个客户端 -> 服务器连接也不太可行。
我将实施不同的通信方案(全对全、星形和树形),因此服务器 1、log(n) 和 n 将必须实例化这 n 个套接字客户端以创建到另一个的连接服务器。
有没有简单的好方法(伪代码)
pool = ConnectionPool.create(vector<IP>);
pool.sendMessage(ip, message);
我知道在服务器端我可以使用异步连接。但是,我真的不知道如何从 C++/Asio 中的 "client"(发件人)角度处理它。
Tl:DR;
Which APIs and classes am I supposed to use when I want to "send" messages to N servers without having to open N connections every time I do that and neither using N threads".
既然你说你想使用 TCP 即基于连接的协议,你可以使用异步 ASIO API 并且可以依赖 1 个线程,因为异步即反应器模式调用不会阻塞。
您的服务器将使用 boost::asio::async_write
到 boost::asio::ip::tcp::socket
,这等于发生一个 TCP 连接。当你完成发送时,你给 async_write
作为参数的回调将被调用,但是 async_write
会立即 return。接收类似于客户端。为了获得与传入客户端的 TCP 连接,您必须使用 boost::asio::ip::tcp::resolver
,它通过客户端中的 boost::asio::ip::tcp::resolver::async_resolve
和 boost::asio::ip::tcp::acceptor
为您打开新的 TCP connections/sockets在服务器端用 boost::asio::ip::tcp::endpoint
和 boost::asio::ip::tcp::acceptor::async_accept
初始化。实际上,您需要 2 个,一个用于 IPv4,一个用于 IPv6。
由于在服务器端有一些状态与 TCP 连接,您通常必须在中心位置进行跟踪,但为了避免这种争用并简化模式,通常使用 class它继承了 std::enable_shared_from_this
,它将自身的 std::shared_pointer
赋给 std::async_write
的回调,这样,在发送和接收之间,线程在通常意义上没有被阻塞,它不会被遗忘,即删除。
对于阅读,我推荐 boost::asio::async_read_until
,一般来说 boost::asio::streambuf
。
通过这 1 个在循环中运行 boost::asio::io_context::run
的线程就足够了,每当许多连接之一需要处理接收到的东西或必须生成要发送的新东西时,它就会解除阻塞。
一般的项目有点超出范围,如果你能缩小你的问题一点,或者更好地阅读会谈和例子,将会有所帮助。我写了一些与你缩进类似的东西,一个弹性覆盖网络:https://github.com/Superlokkus/code
Yes, each process will need a server side (to receive messages from any of the n participants) and one client side (to send messages to any of the n participants). However, as far as I could find in Asio, the only way to send messages to k of the n participants is by creating k threads with k connections
那你一定是没看对地方,或者根本没看多远。
核心原则异步 IO 是在单个线程上多路复用 IO(所有 kqueue/epoll/select/IO 完成端口等抽象都针对该目标)。
这是一个绝对延迟编码的演示,显示:
- 单线程一切
- 接受无限客户端的侦听器(我们可以轻松添加额外的侦听器)
- 我们连接到 "peers"
的集合
在心跳间隔,我们向所有对等方发送心跳消息
for (auto& peer : peers)
async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
});
- 此外,它还处理异步进程信号(INT、TERM)以关闭所有异步操作
#include <boost/asio.hpp>
#include <list>
#include <iostream>
using std::tuple;
using namespace std::literals;
template <typename T>
static auto reference_eq(T const& obj) {
return [p=&obj](auto& ref) { return &ref == p; };
}
int main() {
using namespace boost::asio; // don't be this lazy please
using boost::system::error_code;
using ip::tcp;
io_context ioc;
tcp::acceptor listener(ioc, {{}, 6868});
listener.set_option(tcp::acceptor::reuse_address(true));
listener.listen();
using Loop = std::function<void()>;
std::list<tcp::socket> clients, peers;
// accept unbounded clients
Loop accept_loop = [&] {
listener.async_accept([&](error_code const& ec, tcp::socket s) {
if (!ec) {
std::cout << "New session " << s.remote_endpoint() << std::endl;
clients.push_back(std::move(s));
accept_loop();
}
});
};
tcp::resolver resoler(ioc);
for (auto [host,service] : {
tuple{"www.example.com", "http"},
{"localhost", "6868"},
{"::1", "6868"},
// ...
})
{
auto& p = peers.emplace_back(ioc);
async_connect(p, resoler.resolve(host,service), [&,spec=(host+":"s+service)](error_code ec, auto...) {
std::cout << "For " << spec << " (" << ec.message() << ")";
if (!ec)
std::cout << " " << p.remote_endpoint();
else
peers.remove_if(reference_eq(p));
std::cout << std::endl;
});
}
std::string const& message = "heartbeat\n";
high_resolution_timer timer(ioc);
Loop heartbeat = [&]() mutable {
timer.expires_from_now(2s);
timer.async_wait([&](error_code ec) {
std::cout << "heartbeat " << ec.message() << std::endl;
if (ec)
return;
for (auto& peer : peers)
async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
});
heartbeat();
});
};
signal_set sigs(ioc, SIGINT, SIGTERM);
sigs.async_wait([&](error_code ec, int sig) {
if (!ec) {
std::cout << "signal: " << strsignal(sig) << std::endl;
listener.cancel();
timer.cancel();
} });
accept_loop();
heartbeat();
ioc.run_for(10s); // max time for Coliru, or just `run()`
}
打印(在我的系统上):
New session 127.0.0.1:46730
For localhost:6868 (Success) 127.0.0.1:6868
For ::1:6868 (Connection refused)
For www.example.com:http (Success) 93.184.216.34:80
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
^Csignal: Interrupt
heartbeat Operation canceled
Note how the one client ("New session") is our own peer connection on localhost:6868 :)
当然,在现实生活中你会有一个 class 来表示一个客户端会话,可能有等待发送消息的队列,并且可选地 运行 在多个线程上(使用 strand
s 以同步对共享对象的访问)。
其他示例
如果您真的希望避免显式收集客户,请参阅这个非常相似的演示: 其中
- 也是从单线程开始,只是为了strand演示的目的添加了一个线程池)
- 每个会话都有一个心跳计时器,这意味着每个会话都可以有自己的频率
¹ 由于网络访问受限,它不适用于 coliru。不使用解析器的仅环回版本有效:Live On Coliru
我正在研究套接字的 Asio 文档,但找不到任何关于如何处理以下情况的有用信息:
我假设在对等网络中有很多服务器(最多 1000 个)。 服务器必须定期相互通信,所以我不想在每次需要时都打开一个新的客户端连接来向另一台服务器发送消息(巨大的开销)。
同时,创建 n 个线程,每个线程对应一个客户端 -> 服务器连接也不太可行。
我将实施不同的通信方案(全对全、星形和树形),因此服务器 1、log(n) 和 n 将必须实例化这 n 个套接字客户端以创建到另一个的连接服务器。
有没有简单的好方法(伪代码)
pool = ConnectionPool.create(vector<IP>);
pool.sendMessage(ip, message);
我知道在服务器端我可以使用异步连接。但是,我真的不知道如何从 C++/Asio 中的 "client"(发件人)角度处理它。
Tl:DR;
Which APIs and classes am I supposed to use when I want to "send" messages to N servers without having to open N connections every time I do that and neither using N threads".
既然你说你想使用 TCP 即基于连接的协议,你可以使用异步 ASIO API 并且可以依赖 1 个线程,因为异步即反应器模式调用不会阻塞。
您的服务器将使用 boost::asio::async_write
到 boost::asio::ip::tcp::socket
,这等于发生一个 TCP 连接。当你完成发送时,你给 async_write
作为参数的回调将被调用,但是 async_write
会立即 return。接收类似于客户端。为了获得与传入客户端的 TCP 连接,您必须使用 boost::asio::ip::tcp::resolver
,它通过客户端中的 boost::asio::ip::tcp::resolver::async_resolve
和 boost::asio::ip::tcp::acceptor
为您打开新的 TCP connections/sockets在服务器端用 boost::asio::ip::tcp::endpoint
和 boost::asio::ip::tcp::acceptor::async_accept
初始化。实际上,您需要 2 个,一个用于 IPv4,一个用于 IPv6。
由于在服务器端有一些状态与 TCP 连接,您通常必须在中心位置进行跟踪,但为了避免这种争用并简化模式,通常使用 class它继承了 std::enable_shared_from_this
,它将自身的 std::shared_pointer
赋给 std::async_write
的回调,这样,在发送和接收之间,线程在通常意义上没有被阻塞,它不会被遗忘,即删除。
对于阅读,我推荐 boost::asio::async_read_until
,一般来说 boost::asio::streambuf
。
通过这 1 个在循环中运行 boost::asio::io_context::run
的线程就足够了,每当许多连接之一需要处理接收到的东西或必须生成要发送的新东西时,它就会解除阻塞。
一般的项目有点超出范围,如果你能缩小你的问题一点,或者更好地阅读会谈和例子,将会有所帮助。我写了一些与你缩进类似的东西,一个弹性覆盖网络:https://github.com/Superlokkus/code
Yes, each process will need a server side (to receive messages from any of the n participants) and one client side (to send messages to any of the n participants). However, as far as I could find in Asio, the only way to send messages to k of the n participants is by creating k threads with k connections
那你一定是没看对地方,或者根本没看多远。
核心原则异步 IO 是在单个线程上多路复用 IO(所有 kqueue/epoll/select/IO 完成端口等抽象都针对该目标)。
这是一个绝对延迟编码的演示,显示:
- 单线程一切
- 接受无限客户端的侦听器(我们可以轻松添加额外的侦听器)
- 我们连接到 "peers" 的集合
在心跳间隔,我们向所有对等方发送心跳消息
for (auto& peer : peers) async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) { std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl; });
- 此外,它还处理异步进程信号(INT、TERM)以关闭所有异步操作
#include <boost/asio.hpp>
#include <list>
#include <iostream>
using std::tuple;
using namespace std::literals;
template <typename T>
static auto reference_eq(T const& obj) {
return [p=&obj](auto& ref) { return &ref == p; };
}
int main() {
using namespace boost::asio; // don't be this lazy please
using boost::system::error_code;
using ip::tcp;
io_context ioc;
tcp::acceptor listener(ioc, {{}, 6868});
listener.set_option(tcp::acceptor::reuse_address(true));
listener.listen();
using Loop = std::function<void()>;
std::list<tcp::socket> clients, peers;
// accept unbounded clients
Loop accept_loop = [&] {
listener.async_accept([&](error_code const& ec, tcp::socket s) {
if (!ec) {
std::cout << "New session " << s.remote_endpoint() << std::endl;
clients.push_back(std::move(s));
accept_loop();
}
});
};
tcp::resolver resoler(ioc);
for (auto [host,service] : {
tuple{"www.example.com", "http"},
{"localhost", "6868"},
{"::1", "6868"},
// ...
})
{
auto& p = peers.emplace_back(ioc);
async_connect(p, resoler.resolve(host,service), [&,spec=(host+":"s+service)](error_code ec, auto...) {
std::cout << "For " << spec << " (" << ec.message() << ")";
if (!ec)
std::cout << " " << p.remote_endpoint();
else
peers.remove_if(reference_eq(p));
std::cout << std::endl;
});
}
std::string const& message = "heartbeat\n";
high_resolution_timer timer(ioc);
Loop heartbeat = [&]() mutable {
timer.expires_from_now(2s);
timer.async_wait([&](error_code ec) {
std::cout << "heartbeat " << ec.message() << std::endl;
if (ec)
return;
for (auto& peer : peers)
async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
});
heartbeat();
});
};
signal_set sigs(ioc, SIGINT, SIGTERM);
sigs.async_wait([&](error_code ec, int sig) {
if (!ec) {
std::cout << "signal: " << strsignal(sig) << std::endl;
listener.cancel();
timer.cancel();
} });
accept_loop();
heartbeat();
ioc.run_for(10s); // max time for Coliru, or just `run()`
}
打印(在我的系统上):
New session 127.0.0.1:46730
For localhost:6868 (Success) 127.0.0.1:6868
For ::1:6868 (Connection refused)
For www.example.com:http (Success) 93.184.216.34:80
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
^Csignal: Interrupt
heartbeat Operation canceled
Note how the one client ("New session") is our own peer connection on localhost:6868 :)
当然,在现实生活中你会有一个 class 来表示一个客户端会话,可能有等待发送消息的队列,并且可选地 运行 在多个线程上(使用 strand
s 以同步对共享对象的访问)。
其他示例
如果您真的希望避免显式收集客户,请参阅这个非常相似的演示:
- 也是从单线程开始,只是为了strand演示的目的添加了一个线程池)
- 每个会话都有一个心跳计时器,这意味着每个会话都可以有自己的频率
¹ 由于网络访问受限,它不适用于 coliru。不使用解析器的仅环回版本有效:Live On Coliru