我为 boost::socket 编写的用于简化网络工作的套接字是否有问题?
Is there a problem with the socket I wrote for boost::socket, for the simplifying work with network?
我写了一个套接字class用异步方法包装所有的工作boost::asio,这样做是为了减少代码,只是继承这个class并使用它的方法!有没有什么瑕疵,因为不确定实现是不是有UB或者bug的地方!
#include <boost/asio.hpp>
#include <memory>
#include <string>
#include <utility>
namespace network {
enum Type {
UDP,
TCP
};
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
struct SocketImpl : public std::enable_shared_from_this<SocketImpl<socket_type, resolver_type, endpoint_iter_type>> {
public:
typedef std::function<void()> ConnectCallback, PromoteCallback, PostCallback;
typedef std::function<void(size_t)> WriteCallback;
typedef std::function<void(const uint8_t *, size_t)> ReadCallback;
typedef std::function<void(const std::string &)> ErrorCallback;
explicit SocketImpl(const boost::asio::strand<typename boost::asio::io_service::executor_type> &executor)
: socket_(executor), resolver_(executor), timeout_(executor) {}
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor()), timeout_(sock.get_executor()), socket_(std::move(sock)) {}
void Post(const PostCallback &callback);
auto Get() { return this->shared_from_this(); }
void Connect(std::string Host, std::string Port, const ConnectCallback &connect_callback, const ErrorCallback &error_callback);
virtual void Send(const uint8_t *message_data, size_t size, const WriteCallback &write_callback, const ErrorCallback &error_callback) = 0;
virtual void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) = 0;
template <typename Handler> void Await(boost::posix_time::time_duration ms, Handler f);
virtual void Disconnect();
~SocketImpl();
protected:
void stop_await();
virtual void do_resolve(std::string host, std::string port, const SocketImpl::ConnectCallback &connect_callback,
const SocketImpl::ErrorCallback &error_callback) = 0;
void deadline();
resolver_type resolver_;
endpoint_iter_type endpoint_iter_;
socket_type socket_;
boost::asio::deadline_timer timeout_;
boost::asio::streambuf buff_;
};
template <Type t>
struct Socket
: public SocketImpl<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::resolver, boost::asio::ip::tcp::resolver::iterator> {
explicit Socket(const boost::asio::strand<typename boost::asio::io_service::executor_type> &executor) : SocketImpl(executor) {}
explicit Socket(boost::asio::ip::tcp::socket sock) : SocketImpl(std::move(sock)) {
if (socket_.is_open())
is_connected = true;
}
void Send(const uint8_t *message_data, size_t size, const WriteCallback &write_callback, const ErrorCallback &error_callback) override {
auto self = Get();
Post([this, self, message_data, size, write_callback, error_callback] {
boost::asio::async_write(socket_, boost::asio::buffer(message_data, size),
[this, self, write_callback, error_callback](boost::system::error_code ec, std::size_t bytes_transferred) {
if (!ec) {
write_callback(bytes_transferred);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback(ec.message());
}
});
});
}
void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) override {
auto self = Get();
Post([this, self, size, read_callback, error_callback] {
boost::asio::async_read(socket_, boost::asio::buffer(buff_.prepare(size)),
[this, self, read_callback, error_callback](boost::system::error_code ec, std::size_t length) {
stop_await();
if (!ec) {
const uint8_t *data = boost::asio::buffer_cast<const uint8_t *>(buff_.data());
read_callback(data, length);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback(ec.message());
}
buff_.consume(length);
});
});
}
bool IsConnected() const { return is_connected; }
void ReadUntil(std::string until_str, const ReadCallback &read_callback, const ErrorCallback &error_callback) {
auto self = Get();
Post([this, self, until_str = std::move(until_str), read_callback, error_callback] {
boost::asio::async_read_until(socket_, buff_, until_str,
[this, read_callback, error_callback](boost::system::error_code ec, std::size_t bytes_transferred) {
stop_await();
if (!ec) {
const uint8_t *data = boost::asio::buffer_cast<const uint8_t *>(buff_.data());
read_callback(data, bytes_transferred);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback(ec.message());
}
buff_.consume(bytes_transferred);
});
});
}
protected:
void do_resolve(std::string host, std::string port, const SocketImpl::ConnectCallback &connect_callback,
const SocketImpl::ErrorCallback &error_callback) override {
auto self = Get();
resolver_.async_resolve(host, port,
[this, self, connect_callback, error_callback](
boost::system::error_code ec, boost::asio::ip::tcp::resolver::iterator endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
do_connect(endpoint_iter_, connect_callback, error_callback);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback("Unable to resolve host: " + ec.message());
}
});
}
void do_connect(boost::asio::ip::tcp::resolver::iterator endpoints, const SocketImpl::ConnectCallback &connect_callback,
const SocketImpl::ErrorCallback &error_callback) {
auto self = Get();
boost::asio::async_connect(socket_, std::move(endpoints),
[this, self, connect_callback, error_callback](
boost::system::error_code ec, [[maybe_unused]] const boost::asio::ip::tcp::resolver::iterator &) {
stop_await();
if (!ec) {
connect_callback();
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback("Unable to connect host: " + ec.message());
}
});
}
bool is_connected = false;
};
template <>
struct Socket<UDP>
: public SocketImpl<boost::asio::ip::udp::socket, boost::asio::ip::udp::resolver, boost::asio::ip::udp::resolver::iterator> {
public:
explicit Socket(const boost::asio::strand<typename boost::asio::io_service::executor_type> &executor) : SocketImpl(executor) {}
explicit Socket(boost::asio::ip::udp::socket sock) : SocketImpl(std::move(sock)) {}
void Send(const uint8_t *message_data, size_t size, const WriteCallback &write_callback, const ErrorCallback &error_callback) override {
auto self = Get();
Post([this, self, message_data, size, write_callback, error_callback] {
socket_.async_send_to(boost::asio::buffer(message_data, size), *endpoint_iter_,
[this, self, write_callback, error_callback](boost::system::error_code ec, size_t bytes_transferred) {
if (!ec) {
write_callback(bytes_transferred);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback(ec.message());
}
});
});
}
void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) override {
auto self = Get();
Post([this, self, size, read_callback, error_callback] {
boost::asio::ip::udp::endpoint endpoint = *endpoint_iter_;
socket_.async_receive_from(boost::asio::buffer(buff_.prepare(size)), endpoint,
[this, self, read_callback, error_callback](boost::system::error_code ec, size_t bytes_transferred) {
stop_await();
if (!ec) {
const auto *data = boost::asio::buffer_cast<const uint8_t *>(buff_.data());
read_callback(data, bytes_transferred);
} else {
error_callback(ec.message());
}
buff_.consume(bytes_transferred);
});
});
}
void Promote(const PromoteCallback &callback);
protected:
void do_resolve(std::string host, std::string port, const SocketImpl::ConnectCallback &connect_callback,
const SocketImpl::ErrorCallback &error_callback) override {
auto self = Get();
resolver_.async_resolve(host, port,
[this, self, connect_callback, error_callback](
boost::system::error_code ec, boost::asio::ip::udp::resolver::iterator endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
boost::asio::ip::udp::endpoint endpoint = *endpoint_iter_;
socket_.open(endpoint.protocol());
connect_callback();
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback("Unable to resolve host: " + ec.message());
}
});
}
};
void Socket<UDP>::Promote(const PromoteCallback &callback) {
auto self = Get();
Post([this, self, callback] {
endpoint_iter_++;
socket_.cancel();
callback();
});
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::Post(const SocketImpl::PostCallback &callback) {
post(socket_.get_executor(), callback);
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::Connect(std::string Host, std::string Port,
const SocketImpl::ConnectCallback &connect_callback, const SocketImpl::ErrorCallback &error_callback) {
auto self = Get();
Post([this, self, Host, Port, connect_callback, error_callback] { do_resolve(Host, Port, connect_callback, error_callback); });
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
template <typename Handler>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::Await(boost::posix_time::time_duration ms, Handler f) {
auto self = Get();
Post([this, ms, self, f] {
timeout_.expires_from_now(ms);
timeout_.template async_wait([this, self, f](boost::system::error_code const &ec) {
if (!ec) {
deadline(f);
}
});
});
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::Disconnect() {
auto self = Get();
Post([this, self] {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
timeout_.cancel();
resolver_.cancel();
if (socket_.is_open()) socket_.cancel();
});
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::stop_await() {
timeout_.cancel();
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::deadline() {
if (timeout_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
timeout_.cancel();
socket_.cancel();
} else {
auto self(Get());
timeout_.async_wait([this, self](boost::system::error_code ec) {
if (!ec) {
deadline();
}
});
}
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
SocketImpl<socket_type, resolver_type, endpoint_iter_type>::~SocketImpl() {
if (socket_.is_open()) socket_.close();
}
} // namespace network
我这样使用它(C++ 17):
struct Client : Socket<TCP> { ... };
很高兴就此结构提出建议!谢谢!
代码太多了。
始终在启用警告的情况下进行编译。这会告诉您成员不是按照您列出其初始值设定项的顺序构造的。重要的是,第二个是 UB:
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor()), timeout_(sock.get_executor()), socket_(std::move(sock)) {}
因为socket_
是在之前声明的timeout_
,所以也会在timeout_
之前进行初始化,也就是说sock.get_executor()
实际上是use-after-move.哎呀。修复它:
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor()), socket_(std::move(sock)), timeout_(socket_.get_executor()) {}
现在,即使其他构造函数没有这样的问题,也最好匹配那里的声明顺序:
explicit SocketImpl(Executor executor)
: resolver_(executor)
, socket_(executor)
, timeout_(executor) {}
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor())
, socket_(std::move(sock))
, timeout_(socket_.get_executor()) {}
(制作构造函数的荣誉explicit
)
我会实现任何 Impl
class 内联(命名表明整个 class 无论如何都是“实现细节”)。
这样的析构函数是busy-work:
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
SocketImpl<socket_type, resolver_type, endpoint_iter_type>::~SocketImpl() {
if (socket_.is_open()) {
socket_.close();
}
}
socket_
的默认析构函数已经这样做了。你所做的就是得到
以编译器的方式生成最佳的、异常安全的代码。例如。
在这种情况下 close()
可能会引发异常。你想要那个吗?
考虑通过 const-reference 或通过
如果你打算从他们那里 std::move()
价值。
virtual void do_resolve(std::string host, std::string port,
ConnectCallback const&,
ErrorCallback const&) = 0;
这些实例化:
template <Type>
struct Socket
: public SocketImpl<boost::asio::ip::tcp::socket,
boost::asio::ip::tcp::resolver,
boost::asio::ip::tcp::resolver::iterator> {
和
template <>
struct Socket<UDP>
: public SocketImpl<boost::asio::ip::udp::socket,
boost::asio::ip::udp::resolver,
boost::asio::ip::udp::resolver::iterator> {
看起来很吃力。为什么不直接使用 Asio 的通用模板和协议呢?您甚至可以通过允许调用者覆盖 type-erased 执行程序类型来进行免费的性能优化:
template <typename Protocol,
typename Executor = boost::asio::any_io_executor>
struct SocketImpl
: public std::enable_shared_from_this<SocketImpl<Protocol, Executor>> {
public:
using base_type = SocketImpl<Protocol, Executor>;
using socket_type = std::conditional_t<
std::is_same_v<Protocol, boost::asio::ip::udp>,
boost::asio::basic_datagram_socket<Protocol, Executor>,
boost::asio::basic_socket<Protocol, Executor>>;
using resolver_type =
boost::asio::ip::basic_resolver<Protocol, Executor>;
using endpoint_iter_type = typename resolver_type::iterator;
现在您的实例化可以是:
template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp> {
// ...
template <> struct Socket<UDP> : public SocketImpl<boost::asio::ip::udp> {
完全符合您的行为,或者更好:
using StrandEx = boost::asio::strand<boost::asio::io_context::executor_type>;
template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> {
// ...
template <> struct Socket<UDP> : public SocketImpl<boost::asio::ip::udp, StrandEx> {
执行程序针对链进行了优化,因为您无论如何都将其限制为!
而不是重复类型参数:
explicit Socket(boost::asio::ip::tcp::socket sock) : SocketImpl(std::move(sock)) {
参考暴露的 typedef,所以你有一个单一的事实来源:
explicit Socket(base_type::socket_type sock) : SocketImpl(std::move(sock)) {
按值传递执行器。它们复制起来很便宜,你甚至可以从
他们,因为你正在把他们“灌输”给你的成员
其实只是继承构造函数whole-sale,而不是重复。所以偶:
template <Type>
struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> {
explicit Socket(StrandEx executor) : SocketImpl(executor) {}
explicit Socket(base_type::socket_type sock)
: SocketImpl(std::move(sock)) {}
可能只是:
template <Type>
struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> {
using base_type::base_type;
并让您拥有完全相同的一组构造函数。
那个构造函数设置 is_connected
但它是在撒谎。因为它
当套接字仅打开时将其设置为 true
。你不想要这个,也不
你需要吗
在您的代码中,没有人使用它。你可能想要什么
客户端,是一个状态机。这取决于他们。无需添加活泼、谎言
与您的基地 class 的接口。把责任留在它属于的地方。
同理:
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
这违反了关注点分离。你可能想要这个
行为,但您的 callers/users 可能需要其他东西。更糟糕的是,这
行为可能会破坏他们具有不同偏好的代码。
Get()
除了掩盖它 returns shared_from_this
什么也没做。如果它
是否可以避免使用 this->
明确限定(因为基数
class 是依赖类型),再次使用 using
声明:
using std::enable_shared_from_this<SocketImpl>::shared_from_this;
PostCallback
变成 std::function
有很大问题。它隐藏
关联的执行者类型!看
boost::asio::bind_executor does not execute in strand
有关此问题的详细说明
在你的情况下,完全没有理由键入擦除 Post
参数,所以不要:
void Post(PostCallback callback) {
post(socket_.get_executor(), std::move(callback));
}
应该是
template <typename PostCallback>
void Post(PostCallback&& callback) {
post(socket_.get_executor(), std::forward<PostCallback>(callback));
}
我会为其他回调类型做同样的事情。
using ConnectCallback = std::function<void()>;
using PromoteCallback = std::function<void()>;
using WriteCallback = std::function<void(size_t)>;
using ReadCallback = std::function<void(const uint8_t*, size_t)>;
using ErrorCallback = std::function<void(const std::string&)>;
但我暂时将它作为 reader 的驱魔。
Socket<UDP>::Promote
很奇怪。首先,我质疑逻辑。
void Socket<UDP>::Promote(const PromoteCallback &callback) {
auto self = shared_from_this();
Post([this, self, callback] {
endpoint_iter_++;
socket_.cancel();
callback();
});
}
我觉得增加 endpoint_iter_
没有
检查它是否已经 past-the-end.
此外,没有什么能阻止 运行 在 async_resolve
之前
完成。我认为之前取消挂起的操作更干净
递增该迭代器。
最后,callback()
是 void(void)
所以 - 你只是想
同步完成任务。我会为此建议一个未来:
std::future<void> Promote() {
return Post(std::packaged_task<void()>([this, self = shared_from_this()] {
socket_.cancel(); // TODO wait for that to complete before incrementing
endpoint_iter_++;
}));
}
一个 class 模板的模板参数 从未使用过 是一个明确的
它不需要是模板这一事实的标志。 Socket<TCP>
和
Socket<UDP>
没有关系。
分离连体双胞胎让他们的生活更轻松:
struct TCPSocket : SocketImpl<asio::ip::tcp, StrandEx> { /*...*/ };
struct UDPSocket : SocketImpl<asio::ip::udp, StrandEx> { /*...*/ };
如果出于某种神秘的原因您真的想要模板定义:
template <Type> struct Socket;
template <> struct Socket<TCP> : TCPSocket { using TCPSocket::TCPSocket; };
template <> struct Socket<UDP> : UDPSocket { using UDPSocket::UDPSocket; };
我希望它的琐碎性能够说明类型不需要相关的观点。
deadline
缺少代码,您正在使用 Handler
回调调用它,
但它不需要任何参数。缺的我补上:
template <typename Handler>
void Await(boost::posix_time::time_duration ms, Handler f) {
Post([this, self = shared_from_this(), ms, f] {
timeout_.expires_from_now(ms);
timeout_.template async_wait(
[self, f = std::move(f)](error_code ec) {
if (!ec) {
asio::dispatch(std::move(f));
}
});
});
}
stop_await()
使它更加神秘:timeout_
是
被取消而不考虑谁发帖Await
, 时间和时间
确实建议您希望它作为截止日期执行,因此用户回调确实不适用。
但是,我无法解释为什么 timeout_
被重新启动
自动(虽然它不会被取消,因为
if (!ec)
检查 lambda。我承认我真的想不通,所以
你必须自己决定你想要它做什么。
Read
/ReadUntil
接口非常有限。我看不出我会怎样
例如,用它读取一个简单的 HTTP 响应。对于我的例子,我只是
改为阅读回复 headers。
你应该总是更喜欢使用std::span
或std::string_view
对 charT const*, size_t
。它只是更不容易出错,而且更
更具表现力。
using Data = std::basic_string_view<uint8_t>; // or span and similar
// ... eg:
using ReadCallback = std::function<void(Data)>;
等等,这是什么?
asio::ip::udp::endpoint endpoint = *endpoint_iter_;
socket_.async_receive_from(
asio::buffer(buff_.prepare(size)), endpoint,
您是要覆盖解析器结果中的端点吗?这使得不
感觉。
注意,async_receive_from
使用端点引用参数来指示
传入消息的来源。您正在传递对本地的引用
这里的变量,导致 Undefine
行为 因为
异步操作将在局部变量消失后完成。
改为使用成员变量。
asio::ip::udp::endpoint sender_;
void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) override {
Post([=, this, self = shared_from_this()] {
socket_.async_receive_from(
asio::buffer(buff_.prepare(size)), sender_,
A streambuf
对于大多数操作来说似乎有点矫枉过正,但对于
数据报协议。此外,您在 base-class 中声明它从不使用
它在任何地方。考虑将其移动到 TCP/UDP 派生的 classes.
整个 do_connect/do_resolve 事情是我认为需要的
在 SocketImpl
。 TCP/UDP 基本相同,如果不在
base class 和 Read[Until]/Send 已经是 per-protocol,我不
真正明白为什么你会有 base-class。
我会打开 IS_DATAGRAM
属性 喜欢
static constexpr bool is_datagram = std::is_same_v<Protocol, asio::ip::udp>;
并有一个实现:
void do_resolve(std::string const& host, std::string const& port,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
resolver_.async_resolve(
host, port,
[=, this, self = shared_from_this()](
error_code ec, endpoint_iter_type endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
if constexpr (is_datagram) {
socket_.open(endpoint_iter_->endpoint().protocol());
connect_callback();
} else {
do_connect(endpoint_iter_, connect_callback,
error_callback);
}
} else {
error_callback("Unable to resolve host: " +
ec.message());
}
});
}
如果你想知道 do_connect
是如何编译的
要使虚拟方法有意义,应该有一个共享接口,而您目前没有。因此,要么创建一个基础 class 接口,例如:
struct ISocket {
virtual ~ISocket() = default;
virtual void Send(Data msg, const WriteCallback &write_callback, const ErrorCallback &error_callback) = 0;
virtual void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) = 0;
};
template <typename Protocol, typename Executor = asio::any_io_executor>
struct SocketImpl
: public std::enable_shared_from_this<SocketImpl<Protocol, Executor>>
, ISocket { // ....
Note that this makes it more important to have the virtual destructor. (Although using make_shared<ConcreteType>
can save you because the shared pointers contain the right deleter)
在我看来 Await
应该 也 是虚拟的。但是你把它变成了一个 template<>
成员函数(实际上是 IMO 的正确做法)。
或者,不要追求虚拟,拥抱你不需要的东西
基于动态多态性的共享接口。
如果您需要使 SocketImp 行为依赖于派生的
class,你可以让它成为 CRTP(奇怪的循环模板模式)
相反。
这是我在下面所做的。
改编后的列表和演示
这是带有演示的列表。 namespace network
从 313 行增加到 229 行。
#include <boost/asio.hpp>
#include <memory>
#include <string>
using Data = std::basic_string_view<uint8_t>; // or span and similar
namespace network {
namespace asio = boost::asio;
using boost::system::error_code;
template <typename Protocol, typename Executor = asio::any_io_executor>
struct SocketImpl
: public std::enable_shared_from_this<SocketImpl<Protocol, Executor>> {
public:
using base_type = SocketImpl<Protocol, Executor>;
static constexpr bool is_datagram = std::is_same_v<Protocol, asio::ip::udp>;
using socket_type = std::conditional_t<is_datagram,
asio::basic_datagram_socket<Protocol, Executor>,
asio::basic_stream_socket<Protocol, Executor>>;
using resolver_type = asio::ip::basic_resolver<Protocol, Executor>;
using endpoint_iter_type = typename resolver_type::iterator;
using std::enable_shared_from_this<SocketImpl>::shared_from_this;
using ConnectCallback = std::function<void()>;
using PromoteCallback = std::function<void()>;
using WriteCallback = std::function<void(size_t)>;
using ReadCallback = std::function<void(Data)>;
using ErrorCallback = std::function<void(const std::string&)>;
explicit SocketImpl(Executor executor)
: resolver_(executor)
, socket_(executor)
, timeout_(executor) {}
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor())
, socket_(std::move(sock))
, timeout_(socket_.get_executor()) {}
template <typename Token> decltype(auto) Post(Token&& callback) {
return asio::post(socket_.get_executor(), std::forward<Token>(callback));
}
void Connect(std::string Host, std::string Port,
const ConnectCallback& connect_callback,
const ErrorCallback& error_callback) {
Post([=, self = shared_from_this()] {
self->do_resolve(Host, Port, connect_callback, error_callback);
});
}
template <typename Handler>
void Await(boost::posix_time::time_duration ms, Handler f) {
Post([this, self = shared_from_this(), ms, f] {
timeout_.expires_from_now(ms);
timeout_.template async_wait(
[self, f = std::move(f)](error_code ec) {
if (!ec) {
asio::dispatch(std::move(f));
}
});
});
}
void Disconnect() {
Post([this, self = shared_from_this()] {
timeout_.cancel();
resolver_.cancel();
if (socket_.is_open()) {
socket_.cancel();
}
});
}
protected:
void stop_await() { timeout_.cancel(); }
void do_resolve(std::string const& host, std::string const& port,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
resolver_.async_resolve(
host, port,
[=, this, self = shared_from_this()](
error_code ec, endpoint_iter_type endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
if constexpr (is_datagram) {
socket_.open(endpoint_iter_->endpoint().protocol());
connect_callback();
} else {
do_connect(endpoint_iter_, connect_callback,
error_callback);
}
} else {
error_callback("Unable to resolve host: " + ec.message());
}
});
}
void do_connect(endpoint_iter_type endpoints,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
async_connect( //
socket_, std::move(endpoints),
[=, this, self = shared_from_this()](error_code ec, endpoint_iter_type) {
stop_await();
if (!ec) {
connect_callback();
} else {
error_callback("Unable to connect host: " + ec.message());
}
});
}
resolver_type resolver_;
endpoint_iter_type endpoint_iter_;
socket_type socket_;
asio::deadline_timer timeout_;
};
using StrandEx = asio::strand<asio::io_context::executor_type>;
struct TCPSocket : SocketImpl<asio::ip::tcp, StrandEx> {
using base_type::base_type;
void Send(Data msg, WriteCallback write_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
async_write(socket_, asio::buffer(msg),
[self, write_callback,
error_callback](error_code ec, size_t xfr) {
if (!ec) {
write_callback(xfr);
} else {
error_callback(ec.message());
}
});
});
}
void Read(size_t size, ReadCallback read_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
async_read(
socket_, asio::buffer(buff_.prepare(size)),
[this, self, read_callback, error_callback](error_code ec,
size_t length) {
stop_await();
if (!ec) {
auto data =
asio::buffer_cast<const uint8_t*>(buff_.data());
read_callback({data, length});
} else {
error_callback(ec.message());
}
buff_.consume(length);
});
});
}
void ReadUntil(std::string until_str, const ReadCallback &read_callback, const ErrorCallback &error_callback) {
Post([=, this, self = shared_from_this()] {
async_read_until(
socket_, buff_, until_str,
[=, this](error_code ec, size_t xfr) {
stop_await();
if (!ec) {
auto data =
asio::buffer_cast<const uint8_t*>(buff_.data());
read_callback({data, xfr});
} else {
error_callback(ec.message());
}
buff_.consume(xfr);
});
});
}
protected:
asio::streambuf buff_;
};
struct UDPSocket : SocketImpl<asio::ip::udp, StrandEx> {
using base_type::base_type;
void Send(Data msg, WriteCallback write_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
socket_.async_send_to( //
asio::buffer(msg), *endpoint_iter_,
[=](error_code ec, size_t xfr) {
if (!ec) {
write_callback(xfr);
} else {
error_callback(ec.message());
}
});
});
}
void Read(size_t max_size, ReadCallback read_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
socket_.async_receive_from(
asio::buffer(buff_, max_size), sender_,
[this, self, read_callback, error_callback](error_code ec, size_t xfr) {
this->stop_await();
if (!ec) {
read_callback({buff_.data(), xfr});
} else {
error_callback(ec.message());
}
});
});
}
std::future<void> Promote() {
return Post(std::packaged_task<void()>([this, self = shared_from_this()] {
socket_.cancel(); // TODO wait for that to complete before incrementing
endpoint_iter_++;
}));
}
protected:
asio::ip::udp::endpoint sender_;
std::array<uint8_t, 65530> buff_; // or whatever lower limit you accept
};
enum Type { UDP, TCP };
template <Type> struct Socket;
template <> struct Socket<TCP> : TCPSocket { using TCPSocket::TCPSocket; };
template <> struct Socket<UDP> : UDPSocket { using UDPSocket::UDPSocket; };
} // namespace network
现在我们可以使用 Socket<> 或直接子类型来定义简单的客户端:
struct Client : network::Socket<network::TCP> {
using network::Socket<network::TCP>::Socket;
};
struct UDPClient : network::UDPSocket {
using network::UDPSocket::UDPSocket;
};
如您所见,我暂时没有添加任何行为。
相反,让我们直接在 main
中编写一个简单的 HTTP 客户端和一个简单的 UDP Echo 客户端:
static void on_error(std::string const& s) {
std::cout << "Error: " << s << std::endl;
}
int main() {
asio::io_context io;
static Data const request{reinterpret_cast<uint8_t const*>("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")};
static Data const msg{reinterpret_cast<uint8_t const*>("Hello world!\n")};
auto http = std::make_shared<Client>(make_strand(io));
auto echo = std::make_shared<UDPClient>(make_strand(io));
http->Connect(
"example.com", "http",
[http] {
std::cout << "(Http) Connected" << std::endl;
http->Await(seconds(2), [http] {
std::cout << "(Http) Sending" << std::endl;
http->Send(
request,
[http](size_t n) {
std::cout << "(Http) Sent " << n << std::endl;
http->ReadUntil(
"\r\n\r\n",
[http](Data response) {
std::cout << "(Http) Read: ";
std::cout.write(reinterpret_cast<char const*>(
response.data()),
response.size());
std::cout << std::endl;
http->Disconnect();
},
on_error);
},
on_error);
});
},
on_error);
echo->Connect(
"localhost", "echo",
[echo] {
std::cout << "(Echo) Connected" << std::endl;
echo->Await(seconds(1), [echo] {
std::cout << "(Echo) Sending" << std::endl;
echo->Send(
msg,
[echo](size_t n) {
std::cout << "(Echo) Sent " << n << std::endl;
echo->Read(
msg.size(),
[echo](Data response) {
std::cout << "(Echo) Read: ";
std::cout.write(reinterpret_cast<char const*>(
response.data()),
response.size());
std::cout << std::endl;
echo->Disconnect();
},
on_error);
},
on_error);
});
},
on_error);
std::cout << "START" << std::endl;
io.run();
std::cout << "END" << std::endl;
}
你可以在我的盒子上看到它:
我写了一个套接字class用异步方法包装所有的工作boost::asio,这样做是为了减少代码,只是继承这个class并使用它的方法!有没有什么瑕疵,因为不确定实现是不是有UB或者bug的地方!
#include <boost/asio.hpp>
#include <memory>
#include <string>
#include <utility>
namespace network {
enum Type {
UDP,
TCP
};
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
struct SocketImpl : public std::enable_shared_from_this<SocketImpl<socket_type, resolver_type, endpoint_iter_type>> {
public:
typedef std::function<void()> ConnectCallback, PromoteCallback, PostCallback;
typedef std::function<void(size_t)> WriteCallback;
typedef std::function<void(const uint8_t *, size_t)> ReadCallback;
typedef std::function<void(const std::string &)> ErrorCallback;
explicit SocketImpl(const boost::asio::strand<typename boost::asio::io_service::executor_type> &executor)
: socket_(executor), resolver_(executor), timeout_(executor) {}
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor()), timeout_(sock.get_executor()), socket_(std::move(sock)) {}
void Post(const PostCallback &callback);
auto Get() { return this->shared_from_this(); }
void Connect(std::string Host, std::string Port, const ConnectCallback &connect_callback, const ErrorCallback &error_callback);
virtual void Send(const uint8_t *message_data, size_t size, const WriteCallback &write_callback, const ErrorCallback &error_callback) = 0;
virtual void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) = 0;
template <typename Handler> void Await(boost::posix_time::time_duration ms, Handler f);
virtual void Disconnect();
~SocketImpl();
protected:
void stop_await();
virtual void do_resolve(std::string host, std::string port, const SocketImpl::ConnectCallback &connect_callback,
const SocketImpl::ErrorCallback &error_callback) = 0;
void deadline();
resolver_type resolver_;
endpoint_iter_type endpoint_iter_;
socket_type socket_;
boost::asio::deadline_timer timeout_;
boost::asio::streambuf buff_;
};
template <Type t>
struct Socket
: public SocketImpl<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::resolver, boost::asio::ip::tcp::resolver::iterator> {
explicit Socket(const boost::asio::strand<typename boost::asio::io_service::executor_type> &executor) : SocketImpl(executor) {}
explicit Socket(boost::asio::ip::tcp::socket sock) : SocketImpl(std::move(sock)) {
if (socket_.is_open())
is_connected = true;
}
void Send(const uint8_t *message_data, size_t size, const WriteCallback &write_callback, const ErrorCallback &error_callback) override {
auto self = Get();
Post([this, self, message_data, size, write_callback, error_callback] {
boost::asio::async_write(socket_, boost::asio::buffer(message_data, size),
[this, self, write_callback, error_callback](boost::system::error_code ec, std::size_t bytes_transferred) {
if (!ec) {
write_callback(bytes_transferred);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback(ec.message());
}
});
});
}
void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) override {
auto self = Get();
Post([this, self, size, read_callback, error_callback] {
boost::asio::async_read(socket_, boost::asio::buffer(buff_.prepare(size)),
[this, self, read_callback, error_callback](boost::system::error_code ec, std::size_t length) {
stop_await();
if (!ec) {
const uint8_t *data = boost::asio::buffer_cast<const uint8_t *>(buff_.data());
read_callback(data, length);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback(ec.message());
}
buff_.consume(length);
});
});
}
bool IsConnected() const { return is_connected; }
void ReadUntil(std::string until_str, const ReadCallback &read_callback, const ErrorCallback &error_callback) {
auto self = Get();
Post([this, self, until_str = std::move(until_str), read_callback, error_callback] {
boost::asio::async_read_until(socket_, buff_, until_str,
[this, read_callback, error_callback](boost::system::error_code ec, std::size_t bytes_transferred) {
stop_await();
if (!ec) {
const uint8_t *data = boost::asio::buffer_cast<const uint8_t *>(buff_.data());
read_callback(data, bytes_transferred);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback(ec.message());
}
buff_.consume(bytes_transferred);
});
});
}
protected:
void do_resolve(std::string host, std::string port, const SocketImpl::ConnectCallback &connect_callback,
const SocketImpl::ErrorCallback &error_callback) override {
auto self = Get();
resolver_.async_resolve(host, port,
[this, self, connect_callback, error_callback](
boost::system::error_code ec, boost::asio::ip::tcp::resolver::iterator endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
do_connect(endpoint_iter_, connect_callback, error_callback);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback("Unable to resolve host: " + ec.message());
}
});
}
void do_connect(boost::asio::ip::tcp::resolver::iterator endpoints, const SocketImpl::ConnectCallback &connect_callback,
const SocketImpl::ErrorCallback &error_callback) {
auto self = Get();
boost::asio::async_connect(socket_, std::move(endpoints),
[this, self, connect_callback, error_callback](
boost::system::error_code ec, [[maybe_unused]] const boost::asio::ip::tcp::resolver::iterator &) {
stop_await();
if (!ec) {
connect_callback();
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback("Unable to connect host: " + ec.message());
}
});
}
bool is_connected = false;
};
template <>
struct Socket<UDP>
: public SocketImpl<boost::asio::ip::udp::socket, boost::asio::ip::udp::resolver, boost::asio::ip::udp::resolver::iterator> {
public:
explicit Socket(const boost::asio::strand<typename boost::asio::io_service::executor_type> &executor) : SocketImpl(executor) {}
explicit Socket(boost::asio::ip::udp::socket sock) : SocketImpl(std::move(sock)) {}
void Send(const uint8_t *message_data, size_t size, const WriteCallback &write_callback, const ErrorCallback &error_callback) override {
auto self = Get();
Post([this, self, message_data, size, write_callback, error_callback] {
socket_.async_send_to(boost::asio::buffer(message_data, size), *endpoint_iter_,
[this, self, write_callback, error_callback](boost::system::error_code ec, size_t bytes_transferred) {
if (!ec) {
write_callback(bytes_transferred);
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback(ec.message());
}
});
});
}
void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) override {
auto self = Get();
Post([this, self, size, read_callback, error_callback] {
boost::asio::ip::udp::endpoint endpoint = *endpoint_iter_;
socket_.async_receive_from(boost::asio::buffer(buff_.prepare(size)), endpoint,
[this, self, read_callback, error_callback](boost::system::error_code ec, size_t bytes_transferred) {
stop_await();
if (!ec) {
const auto *data = boost::asio::buffer_cast<const uint8_t *>(buff_.data());
read_callback(data, bytes_transferred);
} else {
error_callback(ec.message());
}
buff_.consume(bytes_transferred);
});
});
}
void Promote(const PromoteCallback &callback);
protected:
void do_resolve(std::string host, std::string port, const SocketImpl::ConnectCallback &connect_callback,
const SocketImpl::ErrorCallback &error_callback) override {
auto self = Get();
resolver_.async_resolve(host, port,
[this, self, connect_callback, error_callback](
boost::system::error_code ec, boost::asio::ip::udp::resolver::iterator endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
boost::asio::ip::udp::endpoint endpoint = *endpoint_iter_;
socket_.open(endpoint.protocol());
connect_callback();
} else {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
error_callback("Unable to resolve host: " + ec.message());
}
});
}
};
void Socket<UDP>::Promote(const PromoteCallback &callback) {
auto self = Get();
Post([this, self, callback] {
endpoint_iter_++;
socket_.cancel();
callback();
});
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::Post(const SocketImpl::PostCallback &callback) {
post(socket_.get_executor(), callback);
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::Connect(std::string Host, std::string Port,
const SocketImpl::ConnectCallback &connect_callback, const SocketImpl::ErrorCallback &error_callback) {
auto self = Get();
Post([this, self, Host, Port, connect_callback, error_callback] { do_resolve(Host, Port, connect_callback, error_callback); });
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
template <typename Handler>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::Await(boost::posix_time::time_duration ms, Handler f) {
auto self = Get();
Post([this, ms, self, f] {
timeout_.expires_from_now(ms);
timeout_.template async_wait([this, self, f](boost::system::error_code const &ec) {
if (!ec) {
deadline(f);
}
});
});
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::Disconnect() {
auto self = Get();
Post([this, self] {
#ifdef OS_WIN
SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US));
#endif
timeout_.cancel();
resolver_.cancel();
if (socket_.is_open()) socket_.cancel();
});
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::stop_await() {
timeout_.cancel();
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
void SocketImpl<socket_type, resolver_type, endpoint_iter_type>::deadline() {
if (timeout_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
timeout_.cancel();
socket_.cancel();
} else {
auto self(Get());
timeout_.async_wait([this, self](boost::system::error_code ec) {
if (!ec) {
deadline();
}
});
}
}
template <typename socket_type, typename resolver_type, typename endpoint_iter_type>
SocketImpl<socket_type, resolver_type, endpoint_iter_type>::~SocketImpl() {
if (socket_.is_open()) socket_.close();
}
} // namespace network
我这样使用它(C++ 17):
struct Client : Socket<TCP> { ... };
很高兴就此结构提出建议!谢谢!
代码太多了。
始终在启用警告的情况下进行编译。这会告诉您成员不是按照您列出其初始值设定项的顺序构造的。重要的是,第二个是 UB:
explicit SocketImpl(socket_type sock) : resolver_(sock.get_executor()), timeout_(sock.get_executor()), socket_(std::move(sock)) {}
因为
socket_
是在之前声明的timeout_
,所以也会在timeout_
之前进行初始化,也就是说sock.get_executor()
实际上是use-after-move.哎呀。修复它:explicit SocketImpl(socket_type sock) : resolver_(sock.get_executor()), socket_(std::move(sock)), timeout_(socket_.get_executor()) {}
现在,即使其他构造函数没有这样的问题,也最好匹配那里的声明顺序:
explicit SocketImpl(Executor executor) : resolver_(executor) , socket_(executor) , timeout_(executor) {} explicit SocketImpl(socket_type sock) : resolver_(sock.get_executor()) , socket_(std::move(sock)) , timeout_(socket_.get_executor()) {}
(制作构造函数的荣誉
explicit
)我会实现任何
Impl
class 内联(命名表明整个 class 无论如何都是“实现细节”)。这样的析构函数是busy-work:
template <typename socket_type, typename resolver_type, typename endpoint_iter_type> SocketImpl<socket_type, resolver_type, endpoint_iter_type>::~SocketImpl() { if (socket_.is_open()) { socket_.close(); } }
socket_
的默认析构函数已经这样做了。你所做的就是得到 以编译器的方式生成最佳的、异常安全的代码。例如。 在这种情况下close()
可能会引发异常。你想要那个吗?考虑通过 const-reference 或通过 如果你打算从他们那里
std::move()
价值。virtual void do_resolve(std::string host, std::string port, ConnectCallback const&, ErrorCallback const&) = 0;
这些实例化:
template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp::socket, boost::asio::ip::tcp::resolver, boost::asio::ip::tcp::resolver::iterator> {
和
template <> struct Socket<UDP> : public SocketImpl<boost::asio::ip::udp::socket, boost::asio::ip::udp::resolver, boost::asio::ip::udp::resolver::iterator> {
看起来很吃力。为什么不直接使用 Asio 的通用模板和协议呢?您甚至可以通过允许调用者覆盖 type-erased 执行程序类型来进行免费的性能优化:
template <typename Protocol, typename Executor = boost::asio::any_io_executor> struct SocketImpl : public std::enable_shared_from_this<SocketImpl<Protocol, Executor>> { public: using base_type = SocketImpl<Protocol, Executor>; using socket_type = std::conditional_t< std::is_same_v<Protocol, boost::asio::ip::udp>, boost::asio::basic_datagram_socket<Protocol, Executor>, boost::asio::basic_socket<Protocol, Executor>>; using resolver_type = boost::asio::ip::basic_resolver<Protocol, Executor>; using endpoint_iter_type = typename resolver_type::iterator;
现在您的实例化可以是:
template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp> { // ... template <> struct Socket<UDP> : public SocketImpl<boost::asio::ip::udp> {
完全符合您的行为,或者更好:
using StrandEx = boost::asio::strand<boost::asio::io_context::executor_type>; template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> { // ... template <> struct Socket<UDP> : public SocketImpl<boost::asio::ip::udp, StrandEx> {
执行程序针对链进行了优化,因为您无论如何都将其限制为!
而不是重复类型参数:
explicit Socket(boost::asio::ip::tcp::socket sock) : SocketImpl(std::move(sock)) {
参考暴露的 typedef,所以你有一个单一的事实来源:
explicit Socket(base_type::socket_type sock) : SocketImpl(std::move(sock)) {
按值传递执行器。它们复制起来很便宜,你甚至可以从 他们,因为你正在把他们“灌输”给你的成员
其实只是继承构造函数whole-sale,而不是重复。所以偶:
template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> { explicit Socket(StrandEx executor) : SocketImpl(executor) {} explicit Socket(base_type::socket_type sock) : SocketImpl(std::move(sock)) {}
可能只是:
template <Type> struct Socket : public SocketImpl<boost::asio::ip::tcp, StrandEx> { using base_type::base_type;
并让您拥有完全相同的一组构造函数。
那个构造函数设置
is_connected
但它是在撒谎。因为它 当套接字仅打开时将其设置为true
。你不想要这个,也不 你需要吗在您的代码中,没有人使用它。你可能想要什么 客户端,是一个状态机。这取决于他们。无需添加活泼、谎言 与您的基地 class 的接口。把责任留在它属于的地方。
同理:
#ifdef OS_WIN SetThreadUILanguage(MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US)); #endif
这违反了关注点分离。你可能想要这个 行为,但您的 callers/users 可能需要其他东西。更糟糕的是,这 行为可能会破坏他们具有不同偏好的代码。
Get()
除了掩盖它 returnsshared_from_this
什么也没做。如果它 是否可以避免使用this->
明确限定(因为基数 class 是依赖类型),再次使用using
声明:using std::enable_shared_from_this<SocketImpl>::shared_from_this;
PostCallback
变成std::function
有很大问题。它隐藏 关联的执行者类型!看 boost::asio::bind_executor does not execute in strand 有关此问题的详细说明在你的情况下,完全没有理由键入擦除
Post
参数,所以不要:void Post(PostCallback callback) { post(socket_.get_executor(), std::move(callback)); }
应该是
template <typename PostCallback> void Post(PostCallback&& callback) { post(socket_.get_executor(), std::forward<PostCallback>(callback)); }
我会为其他回调类型做同样的事情。
using ConnectCallback = std::function<void()>; using PromoteCallback = std::function<void()>; using WriteCallback = std::function<void(size_t)>; using ReadCallback = std::function<void(const uint8_t*, size_t)>; using ErrorCallback = std::function<void(const std::string&)>;
但我暂时将它作为 reader 的驱魔。
Socket<UDP>::Promote
很奇怪。首先,我质疑逻辑。void Socket<UDP>::Promote(const PromoteCallback &callback) { auto self = shared_from_this(); Post([this, self, callback] { endpoint_iter_++; socket_.cancel(); callback(); }); }
我觉得增加
endpoint_iter_
没有 检查它是否已经 past-the-end.此外,没有什么能阻止 运行 在
async_resolve
之前 完成。我认为之前取消挂起的操作更干净 递增该迭代器。最后,
callback()
是void(void)
所以 - 你只是想 同步完成任务。我会为此建议一个未来:std::future<void> Promote() { return Post(std::packaged_task<void()>([this, self = shared_from_this()] { socket_.cancel(); // TODO wait for that to complete before incrementing endpoint_iter_++; })); }
一个 class 模板的模板参数 从未使用过 是一个明确的 它不需要是模板这一事实的标志。
Socket<TCP>
和Socket<UDP>
没有关系。分离连体双胞胎让他们的生活更轻松:
struct TCPSocket : SocketImpl<asio::ip::tcp, StrandEx> { /*...*/ }; struct UDPSocket : SocketImpl<asio::ip::udp, StrandEx> { /*...*/ };
如果出于某种神秘的原因您真的想要模板定义:
template <Type> struct Socket; template <> struct Socket<TCP> : TCPSocket { using TCPSocket::TCPSocket; }; template <> struct Socket<UDP> : UDPSocket { using UDPSocket::UDPSocket; };
我希望它的琐碎性能够说明类型不需要相关的观点。
deadline
缺少代码,您正在使用Handler
回调调用它, 但它不需要任何参数。缺的我补上:template <typename Handler> void Await(boost::posix_time::time_duration ms, Handler f) { Post([this, self = shared_from_this(), ms, f] { timeout_.expires_from_now(ms); timeout_.template async_wait( [self, f = std::move(f)](error_code ec) { if (!ec) { asio::dispatch(std::move(f)); } }); }); }
stop_await()
使它更加神秘:timeout_
是 被取消而不考虑谁发帖Await
, 时间和时间 确实建议您希望它作为截止日期执行,因此用户回调确实不适用。但是,我无法解释为什么
timeout_
被重新启动 自动(虽然它不会被取消,因为if (!ec)
检查 lambda。我承认我真的想不通,所以 你必须自己决定你想要它做什么。Read
/ReadUntil
接口非常有限。我看不出我会怎样 例如,用它读取一个简单的 HTTP 响应。对于我的例子,我只是 改为阅读回复 headers。你应该总是更喜欢使用
std::span
或std::string_view
对charT const*, size_t
。它只是更不容易出错,而且更 更具表现力。using Data = std::basic_string_view<uint8_t>; // or span and similar // ... eg: using ReadCallback = std::function<void(Data)>;
等等,这是什么?
asio::ip::udp::endpoint endpoint = *endpoint_iter_; socket_.async_receive_from( asio::buffer(buff_.prepare(size)), endpoint,
您是要覆盖解析器结果中的端点吗?这使得不 感觉。
注意,
async_receive_from
使用端点引用参数来指示 传入消息的来源。您正在传递对本地的引用 这里的变量,导致 Undefine 行为 因为 异步操作将在局部变量消失后完成。改为使用成员变量。
asio::ip::udp::endpoint sender_; void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) override { Post([=, this, self = shared_from_this()] { socket_.async_receive_from( asio::buffer(buff_.prepare(size)), sender_,
A
streambuf
对于大多数操作来说似乎有点矫枉过正,但对于 数据报协议。此外,您在 base-class 中声明它从不使用 它在任何地方。考虑将其移动到 TCP/UDP 派生的 classes.整个 do_connect/do_resolve 事情是我认为需要的 在
SocketImpl
。 TCP/UDP 基本相同,如果不在 base class 和 Read[Until]/Send 已经是 per-protocol,我不 真正明白为什么你会有 base-class。我会打开
IS_DATAGRAM
属性 喜欢static constexpr bool is_datagram = std::is_same_v<Protocol, asio::ip::udp>;
并有一个实现:
void do_resolve(std::string const& host, std::string const& port, ConnectCallback connect_callback, ErrorCallback error_callback) { resolver_.async_resolve( host, port, [=, this, self = shared_from_this()]( error_code ec, endpoint_iter_type endpoints) { stop_await(); if (!ec) { endpoint_iter_ = std::move(endpoints); if constexpr (is_datagram) { socket_.open(endpoint_iter_->endpoint().protocol()); connect_callback(); } else { do_connect(endpoint_iter_, connect_callback, error_callback); } } else { error_callback("Unable to resolve host: " + ec.message()); } }); }
如果你想知道
do_connect
是如何编译的要使虚拟方法有意义,应该有一个共享接口,而您目前没有。因此,要么创建一个基础 class 接口,例如:
struct ISocket { virtual ~ISocket() = default; virtual void Send(Data msg, const WriteCallback &write_callback, const ErrorCallback &error_callback) = 0; virtual void Read(size_t size, const ReadCallback &read_callback, const ErrorCallback &error_callback) = 0; }; template <typename Protocol, typename Executor = asio::any_io_executor> struct SocketImpl : public std::enable_shared_from_this<SocketImpl<Protocol, Executor>> , ISocket { // ....
Note that this makes it more important to have the virtual destructor. (Although using
make_shared<ConcreteType>
can save you because the shared pointers contain the right deleter)在我看来
Await
应该 也 是虚拟的。但是你把它变成了一个template<>
成员函数(实际上是 IMO 的正确做法)。或者,不要追求虚拟,拥抱你不需要的东西 基于动态多态性的共享接口。
如果您需要使 SocketImp 行为依赖于派生的 class,你可以让它成为 CRTP(奇怪的循环模板模式) 相反。
这是我在下面所做的。
改编后的列表和演示
这是带有演示的列表。 namespace network
从 313 行增加到 229 行。
#include <boost/asio.hpp>
#include <memory>
#include <string>
using Data = std::basic_string_view<uint8_t>; // or span and similar
namespace network {
namespace asio = boost::asio;
using boost::system::error_code;
template <typename Protocol, typename Executor = asio::any_io_executor>
struct SocketImpl
: public std::enable_shared_from_this<SocketImpl<Protocol, Executor>> {
public:
using base_type = SocketImpl<Protocol, Executor>;
static constexpr bool is_datagram = std::is_same_v<Protocol, asio::ip::udp>;
using socket_type = std::conditional_t<is_datagram,
asio::basic_datagram_socket<Protocol, Executor>,
asio::basic_stream_socket<Protocol, Executor>>;
using resolver_type = asio::ip::basic_resolver<Protocol, Executor>;
using endpoint_iter_type = typename resolver_type::iterator;
using std::enable_shared_from_this<SocketImpl>::shared_from_this;
using ConnectCallback = std::function<void()>;
using PromoteCallback = std::function<void()>;
using WriteCallback = std::function<void(size_t)>;
using ReadCallback = std::function<void(Data)>;
using ErrorCallback = std::function<void(const std::string&)>;
explicit SocketImpl(Executor executor)
: resolver_(executor)
, socket_(executor)
, timeout_(executor) {}
explicit SocketImpl(socket_type sock)
: resolver_(sock.get_executor())
, socket_(std::move(sock))
, timeout_(socket_.get_executor()) {}
template <typename Token> decltype(auto) Post(Token&& callback) {
return asio::post(socket_.get_executor(), std::forward<Token>(callback));
}
void Connect(std::string Host, std::string Port,
const ConnectCallback& connect_callback,
const ErrorCallback& error_callback) {
Post([=, self = shared_from_this()] {
self->do_resolve(Host, Port, connect_callback, error_callback);
});
}
template <typename Handler>
void Await(boost::posix_time::time_duration ms, Handler f) {
Post([this, self = shared_from_this(), ms, f] {
timeout_.expires_from_now(ms);
timeout_.template async_wait(
[self, f = std::move(f)](error_code ec) {
if (!ec) {
asio::dispatch(std::move(f));
}
});
});
}
void Disconnect() {
Post([this, self = shared_from_this()] {
timeout_.cancel();
resolver_.cancel();
if (socket_.is_open()) {
socket_.cancel();
}
});
}
protected:
void stop_await() { timeout_.cancel(); }
void do_resolve(std::string const& host, std::string const& port,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
resolver_.async_resolve(
host, port,
[=, this, self = shared_from_this()](
error_code ec, endpoint_iter_type endpoints) {
stop_await();
if (!ec) {
endpoint_iter_ = std::move(endpoints);
if constexpr (is_datagram) {
socket_.open(endpoint_iter_->endpoint().protocol());
connect_callback();
} else {
do_connect(endpoint_iter_, connect_callback,
error_callback);
}
} else {
error_callback("Unable to resolve host: " + ec.message());
}
});
}
void do_connect(endpoint_iter_type endpoints,
ConnectCallback connect_callback,
ErrorCallback error_callback) {
async_connect( //
socket_, std::move(endpoints),
[=, this, self = shared_from_this()](error_code ec, endpoint_iter_type) {
stop_await();
if (!ec) {
connect_callback();
} else {
error_callback("Unable to connect host: " + ec.message());
}
});
}
resolver_type resolver_;
endpoint_iter_type endpoint_iter_;
socket_type socket_;
asio::deadline_timer timeout_;
};
using StrandEx = asio::strand<asio::io_context::executor_type>;
struct TCPSocket : SocketImpl<asio::ip::tcp, StrandEx> {
using base_type::base_type;
void Send(Data msg, WriteCallback write_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
async_write(socket_, asio::buffer(msg),
[self, write_callback,
error_callback](error_code ec, size_t xfr) {
if (!ec) {
write_callback(xfr);
} else {
error_callback(ec.message());
}
});
});
}
void Read(size_t size, ReadCallback read_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
async_read(
socket_, asio::buffer(buff_.prepare(size)),
[this, self, read_callback, error_callback](error_code ec,
size_t length) {
stop_await();
if (!ec) {
auto data =
asio::buffer_cast<const uint8_t*>(buff_.data());
read_callback({data, length});
} else {
error_callback(ec.message());
}
buff_.consume(length);
});
});
}
void ReadUntil(std::string until_str, const ReadCallback &read_callback, const ErrorCallback &error_callback) {
Post([=, this, self = shared_from_this()] {
async_read_until(
socket_, buff_, until_str,
[=, this](error_code ec, size_t xfr) {
stop_await();
if (!ec) {
auto data =
asio::buffer_cast<const uint8_t*>(buff_.data());
read_callback({data, xfr});
} else {
error_callback(ec.message());
}
buff_.consume(xfr);
});
});
}
protected:
asio::streambuf buff_;
};
struct UDPSocket : SocketImpl<asio::ip::udp, StrandEx> {
using base_type::base_type;
void Send(Data msg, WriteCallback write_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
socket_.async_send_to( //
asio::buffer(msg), *endpoint_iter_,
[=](error_code ec, size_t xfr) {
if (!ec) {
write_callback(xfr);
} else {
error_callback(ec.message());
}
});
});
}
void Read(size_t max_size, ReadCallback read_callback, ErrorCallback error_callback) {
Post([=, this, self = shared_from_this()] {
socket_.async_receive_from(
asio::buffer(buff_, max_size), sender_,
[this, self, read_callback, error_callback](error_code ec, size_t xfr) {
this->stop_await();
if (!ec) {
read_callback({buff_.data(), xfr});
} else {
error_callback(ec.message());
}
});
});
}
std::future<void> Promote() {
return Post(std::packaged_task<void()>([this, self = shared_from_this()] {
socket_.cancel(); // TODO wait for that to complete before incrementing
endpoint_iter_++;
}));
}
protected:
asio::ip::udp::endpoint sender_;
std::array<uint8_t, 65530> buff_; // or whatever lower limit you accept
};
enum Type { UDP, TCP };
template <Type> struct Socket;
template <> struct Socket<TCP> : TCPSocket { using TCPSocket::TCPSocket; };
template <> struct Socket<UDP> : UDPSocket { using UDPSocket::UDPSocket; };
} // namespace network
现在我们可以使用 Socket<> 或直接子类型来定义简单的客户端:
struct Client : network::Socket<network::TCP> {
using network::Socket<network::TCP>::Socket;
};
struct UDPClient : network::UDPSocket {
using network::UDPSocket::UDPSocket;
};
如您所见,我暂时没有添加任何行为。
相反,让我们直接在 main
中编写一个简单的 HTTP 客户端和一个简单的 UDP Echo 客户端:
static void on_error(std::string const& s) {
std::cout << "Error: " << s << std::endl;
}
int main() {
asio::io_context io;
static Data const request{reinterpret_cast<uint8_t const*>("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")};
static Data const msg{reinterpret_cast<uint8_t const*>("Hello world!\n")};
auto http = std::make_shared<Client>(make_strand(io));
auto echo = std::make_shared<UDPClient>(make_strand(io));
http->Connect(
"example.com", "http",
[http] {
std::cout << "(Http) Connected" << std::endl;
http->Await(seconds(2), [http] {
std::cout << "(Http) Sending" << std::endl;
http->Send(
request,
[http](size_t n) {
std::cout << "(Http) Sent " << n << std::endl;
http->ReadUntil(
"\r\n\r\n",
[http](Data response) {
std::cout << "(Http) Read: ";
std::cout.write(reinterpret_cast<char const*>(
response.data()),
response.size());
std::cout << std::endl;
http->Disconnect();
},
on_error);
},
on_error);
});
},
on_error);
echo->Connect(
"localhost", "echo",
[echo] {
std::cout << "(Echo) Connected" << std::endl;
echo->Await(seconds(1), [echo] {
std::cout << "(Echo) Sending" << std::endl;
echo->Send(
msg,
[echo](size_t n) {
std::cout << "(Echo) Sent " << n << std::endl;
echo->Read(
msg.size(),
[echo](Data response) {
std::cout << "(Echo) Read: ";
std::cout.write(reinterpret_cast<char const*>(
response.data()),
response.size());
std::cout << std::endl;
echo->Disconnect();
},
on_error);
},
on_error);
});
},
on_error);
std::cout << "START" << std::endl;
io.run();
std::cout << "END" << std::endl;
}
你可以在我的盒子上看到它: