asio:如何将对象从一个 io 上下文传递到另一个
asio: how to pass object from one io context to another
我想更好地理解 async asio 的工作原理。
我有以下代码,我在套接字上调用 async_read 以读取接下来的 10 个字节的数据。
struct SocketReader {
void do_read_body()
{
asio::async_read(socket_,
asio::buffer(msg_, 10),
[this](asio::error_code ec, std::size_t length)
{
if (!ec)
{
//messages_to_work_on.emplace_back(msg_); // <-- I'm trying to send this msg_ instance to another io_context
do_read_body(); // call again
}
else
{
socket_.close();
}
});
}
std::vector<uint8_t> msg_;
asio::tcp::socket _socket;
}
这些读取是在他自己的 std::thread 中的 io_context 运行 内完成的,我在队列中收集从套接字读取的所有消息。到目前为止一切顺利。
我还有另一个“工人”class,它只是根据队列中可用的内容执行一些工作:
struct Worker
{
asio::io_context& io_context_;
std::deque< std::vector<uint8_t> > queue;
Worker(asio::io_context& io_context)
: io_context_(io_context) {
asio::post(io_context_, [this]() {doWork();});
}
void doWork() {
if (!queue.empty())
{
// do some work with front()
queue.pop_front();
}
asio::post(io_context_, [this]() {doWork();});
}
};
那个也在他自己的io_context,运行他自己的线程中执行。所以socket线程和工作线程之间存在并发。
将从套接字接收到的数据 post 发送给 worker class 的正确方法是什么?
我在想我应该能够从套接字完成处理程序中调用,例如:
asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});
这样一来,我至少可以确定没有同时使用工作队列。
但我不确定是否允许我从一个 io_context 到另一个 post,以及我是否不会以这种方式创建另一个竞争条件。
我也不太明白我的消息的内存应该放在哪里,尤其是从一个 io_context 到另一个的传输“之间”。是否需要按值传递消息(因为可以在执行 post 处理程序之前修改 this.msg_)?
谢谢!
注意:您不需要额外的 io_context。
如果您必须进行长 运行 计算,您可以编写自己的 async_xyz 函数并像其他异步函数一样使用它。这个想法是 post 将工作交给 boost 线程池在那里进行计算,然后在工作完成时调用完成处理程序。这是一个使用 boost 线程池对密码进行耗时散列的示例。
template <boost::asio::completion_token_for<void (std::string)> CompletionToken>
auto
async_hash (boost::asio::thread_pool &pool, boost::asio::io_context &io_context, std::string const &password, CompletionToken &&token)
{
return boost::asio::async_initiate<CompletionToken, void (std::string)> (
[&] (auto completion_handler, std::string const &passwordToHash) {
auto io_eq = boost::asio::prefer (io_context.get_executor (), boost::asio::execution::outstanding_work.tracked);
boost::asio::post (pool, [&, io_eq = std::move (io_eq), completion_handler = std::move (completion_handler), passwordToHash] () mutable {
auto hashedPw = pw_to_hash (passwordToHash);
boost::asio::post (io_eq, [hashedPw = std::move (hashedPw), completion_handler = std::move (completion_handler)] () mutable { completion_handler (hashedPw); });
});
},
token, password);
}
然后调用它:
auto hashedPw = co_await async_hash (pool, io_context, createAccountObject.password, boost::asio::use_awaitable);
你好像没有使用协程 ts 所以我认为你必须这样做
async_hash (pool, io_context, createAccountObject.password, /*your lambda here*/);
I should be able to call from the socket completion handler,
something like:
asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});
当然可以。
That way, I'm at least sure that the worker queue is not used concurently. But I'm not sure if I'm allowed to post from one io_context to the other,
io_context
不是魔术。它们基本上是协作任务队列。
and also if I won't create another race condition this way.
我不会坐在这里不看你的代码就下定论(反正我可能不想读完所有代码),但让我重复一遍:io_context
不是魔法。您可以按照您已经知道的线程、任务和资源方面的方式对它们进行推理。
I also don't really understand where the memory for my message should be located, especially "in between" the transfer from one io_context to the other. Is it required I pass the message by value (since this.msg_ can be modified before the post handler is executed) ?
是的。的确。像
post(worker_io_context, [this, msg=std::move(msg_)]() {worker.queue.push_back(std::move(msg)); });
如果移动不便宜,可以选择使用重新计数的智能指针(如 shared_ptr)。如果您实际上在线程之间共享所有权,请考虑将其设为 smartpointer<T const>
。
洗澡的想法:也许你可以不用“工人”队列。由于您正在转向反应堆式异步(使用 Asio),您可能会专注于对任务进行排队,而不是数据。不这样做的原因包括当你想要优先排队时,负载 balancing/back 压力等。[原则上所有这些都可以使用自定义执行程序来实现,但我会坚持我之前知道的这样做。]
您不需要超过一个 io_context,即使对于多线程应用程序,您也可以只使用一个 io_context。您想使您的 SocketReader 成为共享指针,每次发生读取时,向其添加一个计数。我假设接受器、套接字创建和 some_io_context.run() 部分已完成。我会做这样的事情:
class SocketReader
: public std::enable_shared_from_this<SocketReader> // we need this!
{
public:
// Constructor
SocketReader(io_context& ctx)
: ctx_(ctx)
{
}
// read
void do_read_body()
{
auto self(this->shared_from_this()); // we need this!
asio::async_read(socket_,
asio::buffer(msg_, 10),
[this](asio::error_code ec, std::size_t length)
{
if (!ec)
{
// later edit - @sehe is spot on -> is better to move it
asio::post(ctx_, [this, msg=std::move(msg_)]() { // do the long work });
do_read_body();
}
else
{
socket_.shutdown(asio::tcp::socket::shutdown_send, ec);
socket_.close();
}
});
}
private:
io_context& ctx_; // l.e. ctx_ must be a reference
vector<uint8_t> msg_;
asio::tcp::socket _socket;
};
...
// somewhere in the code
auto s_reader = std::make_shared<SocketReader>(some_io_context);
s_reader->do_read_body();
我想更好地理解 async asio 的工作原理。
我有以下代码,我在套接字上调用 async_read 以读取接下来的 10 个字节的数据。
struct SocketReader {
void do_read_body()
{
asio::async_read(socket_,
asio::buffer(msg_, 10),
[this](asio::error_code ec, std::size_t length)
{
if (!ec)
{
//messages_to_work_on.emplace_back(msg_); // <-- I'm trying to send this msg_ instance to another io_context
do_read_body(); // call again
}
else
{
socket_.close();
}
});
}
std::vector<uint8_t> msg_;
asio::tcp::socket _socket;
}
这些读取是在他自己的 std::thread 中的 io_context 运行 内完成的,我在队列中收集从套接字读取的所有消息。到目前为止一切顺利。
我还有另一个“工人”class,它只是根据队列中可用的内容执行一些工作:
struct Worker
{
asio::io_context& io_context_;
std::deque< std::vector<uint8_t> > queue;
Worker(asio::io_context& io_context)
: io_context_(io_context) {
asio::post(io_context_, [this]() {doWork();});
}
void doWork() {
if (!queue.empty())
{
// do some work with front()
queue.pop_front();
}
asio::post(io_context_, [this]() {doWork();});
}
};
那个也在他自己的io_context,运行他自己的线程中执行。所以socket线程和工作线程之间存在并发。
将从套接字接收到的数据 post 发送给 worker class 的正确方法是什么? 我在想我应该能够从套接字完成处理程序中调用,例如:
asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});
这样一来,我至少可以确定没有同时使用工作队列。 但我不确定是否允许我从一个 io_context 到另一个 post,以及我是否不会以这种方式创建另一个竞争条件。 我也不太明白我的消息的内存应该放在哪里,尤其是从一个 io_context 到另一个的传输“之间”。是否需要按值传递消息(因为可以在执行 post 处理程序之前修改 this.msg_)?
谢谢!
注意:您不需要额外的 io_context。
如果您必须进行长 运行 计算,您可以编写自己的 async_xyz 函数并像其他异步函数一样使用它。这个想法是 post 将工作交给 boost 线程池在那里进行计算,然后在工作完成时调用完成处理程序。这是一个使用 boost 线程池对密码进行耗时散列的示例。
template <boost::asio::completion_token_for<void (std::string)> CompletionToken>
auto
async_hash (boost::asio::thread_pool &pool, boost::asio::io_context &io_context, std::string const &password, CompletionToken &&token)
{
return boost::asio::async_initiate<CompletionToken, void (std::string)> (
[&] (auto completion_handler, std::string const &passwordToHash) {
auto io_eq = boost::asio::prefer (io_context.get_executor (), boost::asio::execution::outstanding_work.tracked);
boost::asio::post (pool, [&, io_eq = std::move (io_eq), completion_handler = std::move (completion_handler), passwordToHash] () mutable {
auto hashedPw = pw_to_hash (passwordToHash);
boost::asio::post (io_eq, [hashedPw = std::move (hashedPw), completion_handler = std::move (completion_handler)] () mutable { completion_handler (hashedPw); });
});
},
token, password);
}
然后调用它:
auto hashedPw = co_await async_hash (pool, io_context, createAccountObject.password, boost::asio::use_awaitable);
你好像没有使用协程 ts 所以我认为你必须这样做
async_hash (pool, io_context, createAccountObject.password, /*your lambda here*/);
I should be able to call from the socket completion handler, something like:
asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});
当然可以。
That way, I'm at least sure that the worker queue is not used concurently. But I'm not sure if I'm allowed to post from one io_context to the other,
io_context
不是魔术。它们基本上是协作任务队列。
and also if I won't create another race condition this way.
我不会坐在这里不看你的代码就下定论(反正我可能不想读完所有代码),但让我重复一遍:io_context
不是魔法。您可以按照您已经知道的线程、任务和资源方面的方式对它们进行推理。
I also don't really understand where the memory for my message should be located, especially "in between" the transfer from one io_context to the other. Is it required I pass the message by value (since this.msg_ can be modified before the post handler is executed) ?
是的。的确。像
post(worker_io_context, [this, msg=std::move(msg_)]() {worker.queue.push_back(std::move(msg)); });
如果移动不便宜,可以选择使用重新计数的智能指针(如 shared_ptr)。如果您实际上在线程之间共享所有权,请考虑将其设为 smartpointer<T const>
。
洗澡的想法:也许你可以不用“工人”队列。由于您正在转向反应堆式异步(使用 Asio),您可能会专注于对任务进行排队,而不是数据。不这样做的原因包括当你想要优先排队时,负载 balancing/back 压力等。[原则上所有这些都可以使用自定义执行程序来实现,但我会坚持我之前知道的这样做。]
您不需要超过一个 io_context,即使对于多线程应用程序,您也可以只使用一个 io_context。您想使您的 SocketReader 成为共享指针,每次发生读取时,向其添加一个计数。我假设接受器、套接字创建和 some_io_context.run() 部分已完成。我会做这样的事情:
class SocketReader
: public std::enable_shared_from_this<SocketReader> // we need this!
{
public:
// Constructor
SocketReader(io_context& ctx)
: ctx_(ctx)
{
}
// read
void do_read_body()
{
auto self(this->shared_from_this()); // we need this!
asio::async_read(socket_,
asio::buffer(msg_, 10),
[this](asio::error_code ec, std::size_t length)
{
if (!ec)
{
// later edit - @sehe is spot on -> is better to move it
asio::post(ctx_, [this, msg=std::move(msg_)]() { // do the long work });
do_read_body();
}
else
{
socket_.shutdown(asio::tcp::socket::shutdown_send, ec);
socket_.close();
}
});
}
private:
io_context& ctx_; // l.e. ctx_ must be a reference
vector<uint8_t> msg_;
asio::tcp::socket _socket;
};
...
// somewhere in the code
auto s_reader = std::make_shared<SocketReader>(some_io_context);
s_reader->do_read_body();