asio:如何将对象从一个 io 上下文传递到另一个

asio: how to pass object from one io context to another

我想更好地理解 async asio 的工作原理。

我有以下代码,我在套接字上调用 async_read 以读取接下来的 10 个字节的数据。

struct SocketReader {
    void do_read_body()
    {
        asio::async_read(socket_,
            asio::buffer(msg_, 10),
            [this](asio::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    //messages_to_work_on.emplace_back(msg_); // <-- I'm trying to send this msg_ instance to another io_context
                    do_read_body(); // call again
                }
                else
                {
                    socket_.close();
                }
            });
    }
std::vector<uint8_t> msg_;
asio::tcp::socket _socket;
}

这些读取是在他自己的 std::thread 中的 io_context 运行 内完成的,我在队列中收集从套接字读取的所有消息。到目前为止一切顺利。

我还有另一个“工人”class,它只是根据队列中可用的内容执行一些工作:

struct Worker
{
    asio::io_context& io_context_;
    std::deque< std::vector<uint8_t> > queue;
    Worker(asio::io_context& io_context)
        : io_context_(io_context) {
        asio::post(io_context_, [this]() {doWork();});
    }
    void doWork() {
        if (!queue.empty())
        {
            // do some work with front()
            queue.pop_front();
        }
        asio::post(io_context_, [this]() {doWork();});
    }
};

那个也在他自己的io_context,运行他自己的线程中执行。所以socket线程和工作线程之间存在并发。

将从套接字接收到的数据 post 发送给 worker class 的正确方法是什么? 我在想我应该能够从套接字完成处理程序中调用,例如:

asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});

这样一来,我至少可以确定没有同时使用工作队列。 但我不确定是否允许我从一个 io_context 到另一个 post,以及我是否不会以这种方式创建另一个竞争条件。 我也不太明白我的消息的内存应该放在哪里,尤其是从一个 io_context 到另一个的传输“之间”。是否需要按值传递消息(因为可以在执行 post 处理程序之前修改 this.msg_)?

谢谢!

注意:您不需要额外的 io_context。
如果您必须进行长 运行 计算,您可以编写自己的 async_xyz 函数并像其他异步函数一样使用它。这个想法是 post 将工作交给 boost 线程池在那里进行计算,然后在工作完成时调用完成处理程序。这是一个使用 boost 线程池对密码进行耗时散列的示例。

template <boost::asio::completion_token_for<void (std::string)> CompletionToken>
auto
async_hash (boost::asio::thread_pool &pool, boost::asio::io_context &io_context, std::string const &password, CompletionToken &&token)
{
  return boost::asio::async_initiate<CompletionToken, void (std::string)> (
      [&] (auto completion_handler, std::string const &passwordToHash) {
        auto io_eq = boost::asio::prefer (io_context.get_executor (), boost::asio::execution::outstanding_work.tracked);
        boost::asio::post (pool, [&, io_eq = std::move (io_eq), completion_handler = std::move (completion_handler), passwordToHash] () mutable {
          auto hashedPw = pw_to_hash (passwordToHash);
          boost::asio::post (io_eq, [hashedPw = std::move (hashedPw), completion_handler = std::move (completion_handler)] () mutable { completion_handler (hashedPw); });
        });
      },
      token, password);
}

然后调用它:

auto hashedPw = co_await async_hash (pool, io_context, createAccountObject.password, boost::asio::use_awaitable);

你好像没有使用协程 ts 所以我认为你必须这样做

async_hash (pool, io_context, createAccountObject.password, /*your lambda here*/);

I should be able to call from the socket completion handler, something like:

asio::post(worker_io_context, [this]() {worker.queue.push_back(msg_)});

当然可以。

That way, I'm at least sure that the worker queue is not used concurently. But I'm not sure if I'm allowed to post from one io_context to the other,

io_context 不是魔术。它们基本上是协作任务队列。

and also if I won't create another race condition this way.

我不会坐在这里不看你的代码就下定论(反正我可能不想读完所有代码),但让我重复一遍:io_context 不是魔法。您可以按照您已经知道的线程、任务和资源方面的方式对它们进行推理。

I also don't really understand where the memory for my message should be located, especially "in between" the transfer from one io_context to the other. Is it required I pass the message by value (since this.msg_ can be modified before the post handler is executed) ?

是的。的确。像

post(worker_io_context, [this, msg=std::move(msg_)]() {worker.queue.push_back(std::move(msg)); });

如果移动不便宜,可以选择使用重新计数的智能指针(如 shared_ptr)。如果您实际上在线程之间共享所有权,请考虑将其设为 smartpointer<T const>


洗澡的想法:也许你可以不用“工人”队列。由于您正在转向反应堆式异步(使用 Asio),您可能会专注于对任务进行排队,而不是数据。不这样做的原因包括当你想要优先排队时,负载 balancing/back 压力等。[原则上所有这些都可以使用自定义执行程序来实现,但我会坚持我之前知道的这样做。]

您不需要超过一个 io_context,即使对于多线程应用程序,您也可以只使用一个 io_context。您想使您的 SocketReader 成为共享指针,每次发生读取时,向其添加一个计数。我假设接受器、套接字创建和 some_io_context.run() 部分已完成。我会做这样的事情:

class SocketReader 
   : public std::enable_shared_from_this<SocketReader> // we need this! 
{
public:
    // Constructor
    SocketReader(io_context& ctx) 
    : ctx_(ctx)
    {
    }
    
    // read
    void do_read_body()
    {
        auto self(this->shared_from_this()); // we need this!
        asio::async_read(socket_,
            asio::buffer(msg_, 10),
            [this](asio::error_code ec, std::size_t length)
            {
                if (!ec)
                {
                    // later edit - @sehe is spot on -> is better to move it
                    asio::post(ctx_, [this, msg=std::move(msg_)]() { // do the long work });
                    do_read_body();
                }
                else
                {
                    socket_.shutdown(asio::tcp::socket::shutdown_send, ec);
                    socket_.close();
                }
            });
    }
    
private:
    
    io_context& ctx_; // l.e. ctx_ must be a reference
    vector<uint8_t> msg_;
    asio::tcp::socket _socket;
};

...
// somewhere in the code
auto s_reader = std::make_shared<SocketReader>(some_io_context);
s_reader->do_read_body();