从 asio 中的外部连接处理程序调用 async_write
Call async_write from outside connect handler in asio
我使用 asio 库建立 TCP 连接。异步 read/write 操作由 async_accept.
的处理函数完成
mAcceptor.async_accept(*(mConnection->Socket()),
boost::bind(&TCPConnection::HandleAcceptForWrite, this,
pI8Msg,
boost::asio::placeholders::error));
void TCPConnection::HandleAcceptForWrite(
INT8* pI8Msg,
const boost::system::error_code& err)
{
if (!err) {
TransmitData(pI8Msg);//--> Want to call this fn from outside the handler
}
SocketAcceptConnection(pI8Msg);
}
我想避免从处理程序中调用 TransmitData (async_write)。
我打算从接受处理程序之外的任何地方调用写入。当我这样做时,我得到了错误 - 'Bad file descriptor'
是否总是需要从处理程序中进行异步写入?如果可以从其他地方调用,请分享任何代码示例。
您应该提供必要的上下文。例如。 mConnection
是什么?为什么 mConnection->Socket()
return 是指针?如果这是一个服务器,为什么它只有一个 mConnection
?
用 Crystal 球进行洞穴探险
用我的 crystal 球,我将用
来回答这个问题
- mConnection 是指向封装套接字和某些连接状态的对象的共享指针
- 它在接受之前用一个新的实例初始化,总是
- 因此,除非其他东西共享
*mConnection
的所有权,否则它将在 mConnection
被分配一个新实例时根据定义被销毁。
综上所述,只有一个合理的解释:mConnection
指向一个从enable_shared_from_this<T>
派生的类型T
,这样它就可以与自己共享所有权。您应该能够在 TransmitData
函数中看到这一点,其中应该在绑定表达式(或 lambda)中捕获共享指针以完成 async_read
完成处理程序。
这样做的目的是:保持连接活动,用 C++ 术语来说:延长或保证完成处理程序 return 之前的生命周期。完成处理程序可能会启动更多共享所有权的工作(捕获共享指针),依此类推,直到拥有连接对象的最后一个操作失去兴趣并且连接对象(T
)被释放。
要做什么?
您需要保持连接有效,即使它处于空闲状态。有很多方法。您可以将指向它的共享指针插入“连接 table”(例如 std::vector<shared_ptr<Connection> >
)。缺点是它变得很难 p运行e 连接:连接总是被拥有,所以永远不会被释放。 (见下文 weak_ptr
!)
在实践中,我会让连接(让我们从现在开始称类型为 Connection
而不是 T
)负责:它可能决定对方何时挂断,或者何时有一个exchange 告诉连接关闭,或者甚至超时。
因为后者很常见(连接经常在一段空闲时间后自动关闭),也最灵活(你可以将到期时间设置为“无限”),让我们展示一下:
Side Note: during my effort to expand your snippet into something working, I noticed that weirdly the acceptor appears to live INSIDE the connection type? That makes no sense because how can you have multiple connections with a single acceptor ("server")? So I changed things around to be more typical
请注意,我仍然保留 table 个连接 (mConnections
),但我没有存储 owning 指针(如 shared_ptr<>
)存储 weak_ptr<>
以便我们可以 观察 连接而不改变它们的生命周期。
我对平时的回答采取了不同的方法,并在评论中指出了很多细节:
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <memory>
#include <iostream>
#include <list>
// Library facilities
namespace ba = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
// Application types
using INT8 = std::int8_t;
using Executor = ba::io_context::executor_type;
using Duration = std::chrono::steady_clock::duration;
struct TCPConnection : public /*!*/ std::enable_shared_from_this<TCPConnection> {
TCPConnection(Executor ex, Duration timeout = 120s)
: mStrand(ex)
, mSocket(mStrand)
, mIdleTime(timeout)
, mIdleTimer{mStrand}
{
// cannot use `shared_from_this` inside the constructor!
}
tcp::socket& Socket() { return mSocket; }
~TCPConnection() {
// Demo output, to understand TCPConnection shared lifetime
std::cerr << __PRETTY_FUNCTION__ << std::endl;
}
void StartSession() {
// Here we start the idle timer, so we don't disappear until that
// expires;
// We don't post to the strand because we assume this is the very first
// operation on this object (there can only be 1 thread aware of this
// object)
StartTimer();
}
void TransmitData(INT8 const* pI8Msg)
{
post(mStrand, [this, pI8Msg, self = shared_from_this()] {
std::cout << "TransmitData on " << mSocket.remote_endpoint()
<< std::endl;
// Remove the following line to make the timout also fire if the
// writing takes too long
mIdleTimer.cancel(); // We're busy, not idle
// WARNING: assumes
// - lifetime of pI8Msg guaranteed
// - message NUL-terminated
boost::asio::async_write(
mSocket, ba::buffer(std::basic_string_view(pI8Msg)),
boost::bind(&TCPConnection::OnWritten, shared_from_this(),
ba::placeholders::error(),
ba::placeholders::bytes_transferred()));
});
}
private:
void StartTimer() {
mIdleTimer.expires_from_now(mIdleTime);
mIdleTimer.async_wait(boost::bind(&TCPConnection::OnTimer,
shared_from_this(),
ba::placeholders::error()));
}
void OnTimer(error_code ec)
{
if (ec != ba::error::operation_aborted) // cancellation is not timeout
{
std::cout << "TCPConnection timeout, closing " << mSocket.remote_endpoint() << std::endl;
// Timeout, let's respond by closing the socket and letting the
// connection disappear
// No need to post to the strand since the completion already
// happens on the strand
mSocket.cancel();
mSocket.shutdown(tcp::socket::shutdown_both);
// No more async operations so shared_from_this goes out of scope
}
}
void OnWritten(error_code ec, size_t bytes_transferred)
{
if (ec.failed()) {
std::cerr << "OnWritten: " << ec.message() << std::endl;
// Error, we let the connection die
// In case the timer wasn't canceled pre-write (see comment in
// TransmitData) let's make sure it's canceled here
mIdleTimer.cancel();
// ignoring errors:
mSocket.shutdown(tcp::socket::shutdown_both, ec);
mSocket.close(ec);
}
// write was successful, let's restart timer
std::cerr << "OnWritten: " << ec.message() << " (" << bytes_transferred
<< " bytes)" << std::endl;
StartTimer();
}
ba::strand<Executor> mStrand; // serialize execution
tcp::socket mSocket;
Duration mIdleTime;
ba::steady_timer mIdleTimer{mStrand, mIdleTime};
};
struct Server {
Server(Executor ex, uint16_t port)
: mExecutor(ex)
, mAcceptor{make_strand(mExecutor), {{}, port}}
{
AcceptLoop();
}
void TransmitData(INT8 const* pI8Msg) {
for (auto& weak : mConnections) {
if (auto conn = weak.lock()) {
conn->TransmitData(pI8Msg);
}
}
}
private:
std::shared_ptr<TCPConnection> mConnection;
std::list<std::weak_ptr<TCPConnection> > mConnections;
void AcceptLoop()
{
mConnection = std::make_shared<TCPConnection>(mExecutor, 1s);
mAcceptor.async_accept(mConnection->Socket(),
boost::bind(&Server::HandleAccept, this,
boost::asio::placeholders::error()));
}
void HandleAccept(error_code err)
{
if (!err) {
std::cerr << "HandleAccept: "
<< mConnection->Socket().remote_endpoint() << std::endl;
mConnection->StartSession(); // no tramsmit, just a keeping
// the connection open
mConnections.push_back(
mConnection); // store so we can send messages later
// optional: garbage collect the mConnections table
mConnections.remove_if(
std::mem_fn(&std::weak_ptr<TCPConnection>::expired));
// Keep accepting connections
AcceptLoop();
}
}
Executor mExecutor;
tcp::acceptor mAcceptor;
};
int main() {
ba::io_context mIo;
Server srv(mIo.get_executor(), 7979);
std::thread delayed_transmission([&srv] {
static INT8 const message[] =
"HELLO WORLD\n"; // NUL-terminated and static!
std::this_thread::sleep_for(3s);
std::cout << "Sending '" << message << "'" << std::endl;
srv.TransmitData(message);
});
// mIo.run();
mIo.run_for(10s); // timelimited for Coliru
delayed_transmission.join();
}
这是实时 运行 的演示,展示了超时的连接以及成功的 TransmitData
调用:
- 我设置超时为1s,
- 服务器启动3秒后传输完成(
delayed_transmission
)
- 我提前开始一组 3 个连接,超时,
- 我马上开始另外一组3个连接,正好及时收到延迟传输
- 总服务器 运行 时间限制为 10 秒用于现场演示
文本输出
便于参考(摘自http://coliru.stacked-crooked.com/a/def4f1927c7b975f):
服务器
./a.out &
HandleAccept: 127.0.0.1:43672
HandleAccept: 127.0.0.1:43674
HandleAccept: 127.0.0.1:43676
TCPConnection timeout, closing 127.0.0.1:43672
TCPConnection::~TCPConnection()
TCPConnection timeout, closing 127.0.0.1:43674
TCPConnection::~TCPConnection()
TCPConnection timeout, closing 127.0.0.1:43676
TCPConnection::~TCPConnection()
HandleAccept: 127.0.0.1:43678
HandleAccept: 127.0.0.1:43680
HandleAccept: 127.0.0.1:43682
Sending 'HELLO WORLD
'
TransmitData on 127.0.0.1:43678
TransmitData on 127.0.0.1:43680
TransmitData on 127.0.0.1:43682
OnWritten: SuccessHELLO WORLD
(12 bytes)
OnWritten: Success (12 bytes)
OnWritten: Success (12 bytes)
TCPConnection timeout, closing 127.0.0.1:43678
TCPConnection::~TCPConnection()
TCPConnection timeout, closing 127.0.0.1:43680
TCPConnection::~TCPConnection()
TCPConnection timeout, closing 127.0.0.1:43682
TCPConnection::~TCPConnection()
客户
sleep 1
(for a in {1..3}; do netcat 127.0.0.1 7979& done; time wait)
real 0m1.011s
user 0m0.012s
sys 0m0.004s
sleep 0.5
(for a in {1..3}; do netcat 127.0.0.1 7979& done; time wait)
HELLO WORLD
HELLO WORLD
HELLO WORLD
real 0m1.478s
user 0m0.008s
sys 0m0.008s
我使用 asio 库建立 TCP 连接。异步 read/write 操作由 async_accept.
的处理函数完成 mAcceptor.async_accept(*(mConnection->Socket()),
boost::bind(&TCPConnection::HandleAcceptForWrite, this,
pI8Msg,
boost::asio::placeholders::error));
void TCPConnection::HandleAcceptForWrite(
INT8* pI8Msg,
const boost::system::error_code& err)
{
if (!err) {
TransmitData(pI8Msg);//--> Want to call this fn from outside the handler
}
SocketAcceptConnection(pI8Msg);
}
我想避免从处理程序中调用 TransmitData (async_write)。
我打算从接受处理程序之外的任何地方调用写入。当我这样做时,我得到了错误 - 'Bad file descriptor'
是否总是需要从处理程序中进行异步写入?如果可以从其他地方调用,请分享任何代码示例。
您应该提供必要的上下文。例如。 mConnection
是什么?为什么 mConnection->Socket()
return 是指针?如果这是一个服务器,为什么它只有一个 mConnection
?
用 Crystal 球进行洞穴探险
用我的 crystal 球,我将用
来回答这个问题- mConnection 是指向封装套接字和某些连接状态的对象的共享指针
- 它在接受之前用一个新的实例初始化,总是
- 因此,除非其他东西共享
*mConnection
的所有权,否则它将在mConnection
被分配一个新实例时根据定义被销毁。
综上所述,只有一个合理的解释:mConnection
指向一个从enable_shared_from_this<T>
派生的类型T
,这样它就可以与自己共享所有权。您应该能够在 TransmitData
函数中看到这一点,其中应该在绑定表达式(或 lambda)中捕获共享指针以完成 async_read
完成处理程序。
这样做的目的是:保持连接活动,用 C++ 术语来说:延长或保证完成处理程序 return 之前的生命周期。完成处理程序可能会启动更多共享所有权的工作(捕获共享指针),依此类推,直到拥有连接对象的最后一个操作失去兴趣并且连接对象(T
)被释放。
要做什么?
您需要保持连接有效,即使它处于空闲状态。有很多方法。您可以将指向它的共享指针插入“连接 table”(例如 std::vector<shared_ptr<Connection> >
)。缺点是它变得很难 p运行e 连接:连接总是被拥有,所以永远不会被释放。 (见下文 weak_ptr
!)
在实践中,我会让连接(让我们从现在开始称类型为 Connection
而不是 T
)负责:它可能决定对方何时挂断,或者何时有一个exchange 告诉连接关闭,或者甚至超时。
因为后者很常见(连接经常在一段空闲时间后自动关闭),也最灵活(你可以将到期时间设置为“无限”),让我们展示一下:
Side Note: during my effort to expand your snippet into something working, I noticed that weirdly the acceptor appears to live INSIDE the connection type? That makes no sense because how can you have multiple connections with a single acceptor ("server")? So I changed things around to be more typical
请注意,我仍然保留 table 个连接 (mConnections
),但我没有存储 owning 指针(如 shared_ptr<>
)存储 weak_ptr<>
以便我们可以 观察 连接而不改变它们的生命周期。
我对平时的回答采取了不同的方法,并在评论中指出了很多细节:
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <memory>
#include <iostream>
#include <list>
// Library facilities
namespace ba = boost::asio;
using boost::asio::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
// Application types
using INT8 = std::int8_t;
using Executor = ba::io_context::executor_type;
using Duration = std::chrono::steady_clock::duration;
struct TCPConnection : public /*!*/ std::enable_shared_from_this<TCPConnection> {
TCPConnection(Executor ex, Duration timeout = 120s)
: mStrand(ex)
, mSocket(mStrand)
, mIdleTime(timeout)
, mIdleTimer{mStrand}
{
// cannot use `shared_from_this` inside the constructor!
}
tcp::socket& Socket() { return mSocket; }
~TCPConnection() {
// Demo output, to understand TCPConnection shared lifetime
std::cerr << __PRETTY_FUNCTION__ << std::endl;
}
void StartSession() {
// Here we start the idle timer, so we don't disappear until that
// expires;
// We don't post to the strand because we assume this is the very first
// operation on this object (there can only be 1 thread aware of this
// object)
StartTimer();
}
void TransmitData(INT8 const* pI8Msg)
{
post(mStrand, [this, pI8Msg, self = shared_from_this()] {
std::cout << "TransmitData on " << mSocket.remote_endpoint()
<< std::endl;
// Remove the following line to make the timout also fire if the
// writing takes too long
mIdleTimer.cancel(); // We're busy, not idle
// WARNING: assumes
// - lifetime of pI8Msg guaranteed
// - message NUL-terminated
boost::asio::async_write(
mSocket, ba::buffer(std::basic_string_view(pI8Msg)),
boost::bind(&TCPConnection::OnWritten, shared_from_this(),
ba::placeholders::error(),
ba::placeholders::bytes_transferred()));
});
}
private:
void StartTimer() {
mIdleTimer.expires_from_now(mIdleTime);
mIdleTimer.async_wait(boost::bind(&TCPConnection::OnTimer,
shared_from_this(),
ba::placeholders::error()));
}
void OnTimer(error_code ec)
{
if (ec != ba::error::operation_aborted) // cancellation is not timeout
{
std::cout << "TCPConnection timeout, closing " << mSocket.remote_endpoint() << std::endl;
// Timeout, let's respond by closing the socket and letting the
// connection disappear
// No need to post to the strand since the completion already
// happens on the strand
mSocket.cancel();
mSocket.shutdown(tcp::socket::shutdown_both);
// No more async operations so shared_from_this goes out of scope
}
}
void OnWritten(error_code ec, size_t bytes_transferred)
{
if (ec.failed()) {
std::cerr << "OnWritten: " << ec.message() << std::endl;
// Error, we let the connection die
// In case the timer wasn't canceled pre-write (see comment in
// TransmitData) let's make sure it's canceled here
mIdleTimer.cancel();
// ignoring errors:
mSocket.shutdown(tcp::socket::shutdown_both, ec);
mSocket.close(ec);
}
// write was successful, let's restart timer
std::cerr << "OnWritten: " << ec.message() << " (" << bytes_transferred
<< " bytes)" << std::endl;
StartTimer();
}
ba::strand<Executor> mStrand; // serialize execution
tcp::socket mSocket;
Duration mIdleTime;
ba::steady_timer mIdleTimer{mStrand, mIdleTime};
};
struct Server {
Server(Executor ex, uint16_t port)
: mExecutor(ex)
, mAcceptor{make_strand(mExecutor), {{}, port}}
{
AcceptLoop();
}
void TransmitData(INT8 const* pI8Msg) {
for (auto& weak : mConnections) {
if (auto conn = weak.lock()) {
conn->TransmitData(pI8Msg);
}
}
}
private:
std::shared_ptr<TCPConnection> mConnection;
std::list<std::weak_ptr<TCPConnection> > mConnections;
void AcceptLoop()
{
mConnection = std::make_shared<TCPConnection>(mExecutor, 1s);
mAcceptor.async_accept(mConnection->Socket(),
boost::bind(&Server::HandleAccept, this,
boost::asio::placeholders::error()));
}
void HandleAccept(error_code err)
{
if (!err) {
std::cerr << "HandleAccept: "
<< mConnection->Socket().remote_endpoint() << std::endl;
mConnection->StartSession(); // no tramsmit, just a keeping
// the connection open
mConnections.push_back(
mConnection); // store so we can send messages later
// optional: garbage collect the mConnections table
mConnections.remove_if(
std::mem_fn(&std::weak_ptr<TCPConnection>::expired));
// Keep accepting connections
AcceptLoop();
}
}
Executor mExecutor;
tcp::acceptor mAcceptor;
};
int main() {
ba::io_context mIo;
Server srv(mIo.get_executor(), 7979);
std::thread delayed_transmission([&srv] {
static INT8 const message[] =
"HELLO WORLD\n"; // NUL-terminated and static!
std::this_thread::sleep_for(3s);
std::cout << "Sending '" << message << "'" << std::endl;
srv.TransmitData(message);
});
// mIo.run();
mIo.run_for(10s); // timelimited for Coliru
delayed_transmission.join();
}
这是实时 运行 的演示,展示了超时的连接以及成功的 TransmitData
调用:
- 我设置超时为1s,
- 服务器启动3秒后传输完成(
delayed_transmission
) - 我提前开始一组 3 个连接,超时,
- 我马上开始另外一组3个连接,正好及时收到延迟传输
- 总服务器 运行 时间限制为 10 秒用于现场演示
文本输出
便于参考(摘自http://coliru.stacked-crooked.com/a/def4f1927c7b975f):
服务器
./a.out & HandleAccept: 127.0.0.1:43672 HandleAccept: 127.0.0.1:43674 HandleAccept: 127.0.0.1:43676 TCPConnection timeout, closing 127.0.0.1:43672 TCPConnection::~TCPConnection() TCPConnection timeout, closing 127.0.0.1:43674 TCPConnection::~TCPConnection() TCPConnection timeout, closing 127.0.0.1:43676 TCPConnection::~TCPConnection() HandleAccept: 127.0.0.1:43678 HandleAccept: 127.0.0.1:43680 HandleAccept: 127.0.0.1:43682 Sending 'HELLO WORLD ' TransmitData on 127.0.0.1:43678 TransmitData on 127.0.0.1:43680 TransmitData on 127.0.0.1:43682 OnWritten: SuccessHELLO WORLD (12 bytes) OnWritten: Success (12 bytes) OnWritten: Success (12 bytes) TCPConnection timeout, closing 127.0.0.1:43678 TCPConnection::~TCPConnection() TCPConnection timeout, closing 127.0.0.1:43680 TCPConnection::~TCPConnection() TCPConnection timeout, closing 127.0.0.1:43682 TCPConnection::~TCPConnection()
客户
sleep 1 (for a in {1..3}; do netcat 127.0.0.1 7979& done; time wait) real 0m1.011s user 0m0.012s sys 0m0.004s sleep 0.5 (for a in {1..3}; do netcat 127.0.0.1 7979& done; time wait) HELLO WORLD HELLO WORLD HELLO WORLD real 0m1.478s user 0m0.008s sys 0m0.008s