外部服务器的 C++ 客户端和外部客户端的服务器同时使用 boost::asio 协程
C++ Client for external server & Server for external clients simultaneously with boost::asio coroutine
是否可以并行创建一个包含客户端和服务器 运行 并在它们之间进行通信的单线程进程?
我有一个场景,我需要既是客户端又是服务器,我们可以receive/send任何可能方向的数据。
- Server (PORT A) -> APP(client-> process message and send to ->server (PORT B)) -> ClientA
- Server (PORT A) <- APP(client<- process message and send to <-server (PORT B)) <- ClientA
- Server (PORT A) -> APP(client-> process message and send to to Server (PORT A)
- ClientA -> APP(server(PORT B)-> 处理消息并发送到 到客户端 A
我一直在尝试使用示例中提供的 boost::asio 协程修改示例,并在 io_context 或单独的 io_context 中添加客户端,但我正在努力有了它。
我试过将客户端和服务器放在不同的线程中,但我也遇到了并发问题...任何想法或片段将不胜感激。
I have an scenario where I need to be both client and server and we can receive/send data in any possible direction.
这是网络应用程序的标准配置。
聊天服务器似乎是一个很好的例子(因为它在所有方向上都 send/receive):https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/examples/cpp11_examples.html#boost_asio.examples.cpp11_examples.chat
现在,作为也启动传出连接的服务器的准系统示例:
让我们创建一个服务器,为每个接受的请求生成一个异步会话
联系。每个会话都是一个“反向回显”服务。
// echo server, multi-client
spawn(io, [&io](ba::yield_context yc) {
tcp::acceptor acc(io, {{}, 6868});
acc.set_option(tcp::acceptor::reuse_address(true));
while (true) {
tcp::socket s(io);
acc.async_accept(s, yc);
spawn(yc, [s = std::move(s)]
(ba::yield_context yc) mutable {
log("Connection from ", s.remote_endpoint());
std::string msg;
while (auto n = async_read_until(s, ba::dynamic_buffer(msg), "\n", yc)) {
std::string_view vw(msg.data(), n);
vw.remove_suffix(1); // leave '\n'
log("Responding to ", std::quoted(vw));
std::reverse(msg.data(), msg.data() + vw.size());
async_write(s, ba::buffer(msg, n), yc);
msg = msg.substr(n);
}
});
}
});
同时,我们 运行 5 个客户。如果没有外部服务器,我们会让他们连接到我们自己的服务器。
这允许我们有一个独立的演示,并将证明单线程不会导致任何阻塞。
// a random client, let's make it connect to our own server, just for this demo
for (auto client_id = 0; client_id<5; ++client_id) {
spawn(io,
[&io, delay, client_id, log=logger("client #" + std::to_string(client_id))]
(ba::yield_context yc) {
tcp::resolver r(io);
tcp::socket s(io);
async_connect(s, r.async_resolve("127.0.0.1", "6868", yc), yc);
while (true) {
delay(yc);
ba::streambuf buf;
std::ostream(&buf)
<< "Hello from client #" << client_id << "\n";
async_write(s, buf, yc);
std::string response;
async_read_until(s, ba::dynamic_buffer(response), "\n", yc);
if (!response.empty())
response.pop_back();
log("Received response ", std::quoted(response));
}
});
}
其中delay
是随机延迟(500..1500ms):
auto delay = [&io](ba::yield_context yc) {
ba::steady_timer(io, 500ms + (prng() % 1000) * 1ms)
.async_wait(yc);
};
我们运行整个程序3秒,然后退出:
io.run_for(3s);
logger("main")("Bye");
版画
at 0ms session #0 Connection from 127.0.0.1:51024
at 1ms session #1 Connection from 127.0.0.1:51026
at 1ms session #2 Connection from 127.0.0.1:51028
at 1ms session #3 Connection from 127.0.0.1:51030
at 1ms session #4 Connection from 127.0.0.1:51032
at 831ms session #3 Responding to "Hello from client #3"
at 831ms client #3 Received response "3# tneilc morf olleH"
at 1148ms session #4 Responding to "Hello from client #4"
at 1148ms client #4 Received response "4# tneilc morf olleH"
at 1196ms session #1 Responding to "Hello from client #1"
at 1196ms client #1 Received response "1# tneilc morf olleH"
at 1327ms session #0 Responding to "Hello from client #0"
at 1327ms client #0 Received response "0# tneilc morf olleH"
at 1401ms session #2 Responding to "Hello from client #2"
at 1401ms client #2 Received response "2# tneilc morf olleH"
at 1446ms session #3 Responding to "Hello from client #3"
at 1446ms client #3 Received response "3# tneilc morf olleH"
at 1836ms session #4 Responding to "Hello from client #4"
at 1836ms client #4 Received response "4# tneilc morf olleH"
at 2163ms session #0 Responding to "Hello from client #0"
at 2163ms client #0 Received response "0# tneilc morf olleH"
at 2382ms session #2 Responding to "Hello from client #2"
at 2383ms client #2 Received response "2# tneilc morf olleH"
at 2426ms session #3 Responding to "Hello from client #3"
at 2426ms client #3 Received response "3# tneilc morf olleH"
at 2444ms session #4 Responding to "Hello from client #4"
at 2444ms client #4 Received response "4# tneilc morf olleH"
at 2579ms session #1 Responding to "Hello from client #1"
at 2580ms client #1 Received response "1# tneilc morf olleH"
at 3002ms main Bye
完整列表
#include <boost/asio/detail/handler_alloc_helpers.hpp>
#include <boost/system/system_error.hpp>
#define BOOST_BIND_NO_PLACEHOLDERS
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>
#include <thread>
#include <chrono>
#include <random>
namespace ba = boost::asio;
using ba::ip::tcp;
using namespace std::literals;
static auto const now = &std::chrono::steady_clock::now;
static auto const start = now();
auto logger(std::string name) {
return [name](auto const&... args) {
((std::cout << "at" << std::setw(6) << (now() - start)/1ms << "ms\t"
<< name << "\t")
<< ... << args) << std::endl;
};
}
int main() {
ba::io_context io;
static std::mt19937 prng { std::random_device{}() };
// insert random async delays
auto delay = [&io](auto yc) {
ba::steady_timer(io, 500ms + (prng() % 1000) * 1ms)
.async_wait(yc);
};
// echo server, multi-client
spawn(io, [&io, log=logger("accept")](ba::yield_context yc) {
tcp::acceptor acc(io, {{}, 6868});
acc.set_option(tcp::acceptor::reuse_address(true));
auto num_clients = 0;
while (true) {
tcp::socket s(io);
acc.async_accept(s, yc);
spawn(yc, [s = std::move(s), log=logger("session #" + std::to_string(num_clients++))]
(ba::yield_context yc) mutable {
log("Connection from ", s.remote_endpoint());
std::string msg;
while (auto n = async_read_until(s, ba::dynamic_buffer(msg), "\n", yc)) {
std::string_view vw(msg.data(), n);
vw.remove_suffix(1); // leave '\n'
log("Responding to ", std::quoted(vw));
std::reverse(msg.data(), msg.data() + vw.size());
async_write(s, ba::buffer(msg, n), yc);
msg = msg.substr(n);
}
});
}
});
// a random client, let's make it connect to our own server, just for this demo
for (auto client_id = 0; client_id<5; ++client_id) {
spawn(io,
[&io, delay, client_id, log=logger("client #" + std::to_string(client_id))]
(ba::yield_context yc) {
tcp::resolver r(io);
tcp::socket s(io);
async_connect(s, r.async_resolve("127.0.0.1", "6868", yc), yc);
while (true) {
delay(yc);
ba::streambuf buf;
std::ostream(&buf)
<< "Hello from client #" << client_id << "\n";
async_write(s, buf, yc);
std::string response;
async_read_until(s, ba::dynamic_buffer(response), "\n", yc);
if (!response.empty())
response.pop_back();
log("Received response ", std::quoted(response));
}
});
}
io.run_for(3s);
logger("main")("Bye");
}
是否可以并行创建一个包含客户端和服务器 运行 并在它们之间进行通信的单线程进程?
我有一个场景,我需要既是客户端又是服务器,我们可以receive/send任何可能方向的数据。
- Server (PORT A) -> APP(client-> process message and send to ->server (PORT B)) -> ClientA
- Server (PORT A) <- APP(client<- process message and send to <-server (PORT B)) <- ClientA
- Server (PORT A) -> APP(client-> process message and send to to Server (PORT A)
- ClientA -> APP(server(PORT B)-> 处理消息并发送到 到客户端 A
我一直在尝试使用示例中提供的 boost::asio 协程修改示例,并在 io_context 或单独的 io_context 中添加客户端,但我正在努力有了它。
我试过将客户端和服务器放在不同的线程中,但我也遇到了并发问题...任何想法或片段将不胜感激。
I have an scenario where I need to be both client and server and we can receive/send data in any possible direction.
这是网络应用程序的标准配置。
聊天服务器似乎是一个很好的例子(因为它在所有方向上都 send/receive):https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/examples/cpp11_examples.html#boost_asio.examples.cpp11_examples.chat
现在,作为也启动传出连接的服务器的准系统示例:
让我们创建一个服务器,为每个接受的请求生成一个异步会话 联系。每个会话都是一个“反向回显”服务。
// echo server, multi-client
spawn(io, [&io](ba::yield_context yc) {
tcp::acceptor acc(io, {{}, 6868});
acc.set_option(tcp::acceptor::reuse_address(true));
while (true) {
tcp::socket s(io);
acc.async_accept(s, yc);
spawn(yc, [s = std::move(s)]
(ba::yield_context yc) mutable {
log("Connection from ", s.remote_endpoint());
std::string msg;
while (auto n = async_read_until(s, ba::dynamic_buffer(msg), "\n", yc)) {
std::string_view vw(msg.data(), n);
vw.remove_suffix(1); // leave '\n'
log("Responding to ", std::quoted(vw));
std::reverse(msg.data(), msg.data() + vw.size());
async_write(s, ba::buffer(msg, n), yc);
msg = msg.substr(n);
}
});
}
});
同时,我们 运行 5 个客户。如果没有外部服务器,我们会让他们连接到我们自己的服务器。
这允许我们有一个独立的演示,并将证明单线程不会导致任何阻塞。
// a random client, let's make it connect to our own server, just for this demo
for (auto client_id = 0; client_id<5; ++client_id) {
spawn(io,
[&io, delay, client_id, log=logger("client #" + std::to_string(client_id))]
(ba::yield_context yc) {
tcp::resolver r(io);
tcp::socket s(io);
async_connect(s, r.async_resolve("127.0.0.1", "6868", yc), yc);
while (true) {
delay(yc);
ba::streambuf buf;
std::ostream(&buf)
<< "Hello from client #" << client_id << "\n";
async_write(s, buf, yc);
std::string response;
async_read_until(s, ba::dynamic_buffer(response), "\n", yc);
if (!response.empty())
response.pop_back();
log("Received response ", std::quoted(response));
}
});
}
其中delay
是随机延迟(500..1500ms):
auto delay = [&io](ba::yield_context yc) {
ba::steady_timer(io, 500ms + (prng() % 1000) * 1ms)
.async_wait(yc);
};
我们运行整个程序3秒,然后退出:
io.run_for(3s);
logger("main")("Bye");
版画
at 0ms session #0 Connection from 127.0.0.1:51024
at 1ms session #1 Connection from 127.0.0.1:51026
at 1ms session #2 Connection from 127.0.0.1:51028
at 1ms session #3 Connection from 127.0.0.1:51030
at 1ms session #4 Connection from 127.0.0.1:51032
at 831ms session #3 Responding to "Hello from client #3"
at 831ms client #3 Received response "3# tneilc morf olleH"
at 1148ms session #4 Responding to "Hello from client #4"
at 1148ms client #4 Received response "4# tneilc morf olleH"
at 1196ms session #1 Responding to "Hello from client #1"
at 1196ms client #1 Received response "1# tneilc morf olleH"
at 1327ms session #0 Responding to "Hello from client #0"
at 1327ms client #0 Received response "0# tneilc morf olleH"
at 1401ms session #2 Responding to "Hello from client #2"
at 1401ms client #2 Received response "2# tneilc morf olleH"
at 1446ms session #3 Responding to "Hello from client #3"
at 1446ms client #3 Received response "3# tneilc morf olleH"
at 1836ms session #4 Responding to "Hello from client #4"
at 1836ms client #4 Received response "4# tneilc morf olleH"
at 2163ms session #0 Responding to "Hello from client #0"
at 2163ms client #0 Received response "0# tneilc morf olleH"
at 2382ms session #2 Responding to "Hello from client #2"
at 2383ms client #2 Received response "2# tneilc morf olleH"
at 2426ms session #3 Responding to "Hello from client #3"
at 2426ms client #3 Received response "3# tneilc morf olleH"
at 2444ms session #4 Responding to "Hello from client #4"
at 2444ms client #4 Received response "4# tneilc morf olleH"
at 2579ms session #1 Responding to "Hello from client #1"
at 2580ms client #1 Received response "1# tneilc morf olleH"
at 3002ms main Bye
完整列表
#include <boost/asio/detail/handler_alloc_helpers.hpp>
#include <boost/system/system_error.hpp>
#define BOOST_BIND_NO_PLACEHOLDERS
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>
#include <iomanip>
#include <thread>
#include <chrono>
#include <random>
namespace ba = boost::asio;
using ba::ip::tcp;
using namespace std::literals;
static auto const now = &std::chrono::steady_clock::now;
static auto const start = now();
auto logger(std::string name) {
return [name](auto const&... args) {
((std::cout << "at" << std::setw(6) << (now() - start)/1ms << "ms\t"
<< name << "\t")
<< ... << args) << std::endl;
};
}
int main() {
ba::io_context io;
static std::mt19937 prng { std::random_device{}() };
// insert random async delays
auto delay = [&io](auto yc) {
ba::steady_timer(io, 500ms + (prng() % 1000) * 1ms)
.async_wait(yc);
};
// echo server, multi-client
spawn(io, [&io, log=logger("accept")](ba::yield_context yc) {
tcp::acceptor acc(io, {{}, 6868});
acc.set_option(tcp::acceptor::reuse_address(true));
auto num_clients = 0;
while (true) {
tcp::socket s(io);
acc.async_accept(s, yc);
spawn(yc, [s = std::move(s), log=logger("session #" + std::to_string(num_clients++))]
(ba::yield_context yc) mutable {
log("Connection from ", s.remote_endpoint());
std::string msg;
while (auto n = async_read_until(s, ba::dynamic_buffer(msg), "\n", yc)) {
std::string_view vw(msg.data(), n);
vw.remove_suffix(1); // leave '\n'
log("Responding to ", std::quoted(vw));
std::reverse(msg.data(), msg.data() + vw.size());
async_write(s, ba::buffer(msg, n), yc);
msg = msg.substr(n);
}
});
}
});
// a random client, let's make it connect to our own server, just for this demo
for (auto client_id = 0; client_id<5; ++client_id) {
spawn(io,
[&io, delay, client_id, log=logger("client #" + std::to_string(client_id))]
(ba::yield_context yc) {
tcp::resolver r(io);
tcp::socket s(io);
async_connect(s, r.async_resolve("127.0.0.1", "6868", yc), yc);
while (true) {
delay(yc);
ba::streambuf buf;
std::ostream(&buf)
<< "Hello from client #" << client_id << "\n";
async_write(s, buf, yc);
std::string response;
async_read_until(s, ba::dynamic_buffer(response), "\n", yc);
if (!response.empty())
response.pop_back();
log("Received response ", std::quoted(response));
}
});
}
io.run_for(3s);
logger("main")("Bye");
}