理解strand的无锁用法

Understand the usage of strand without locking

参考: websocket_client_async_ssl.cpp strands

问题1>这是我的理解:

Given a few async operations bound with the same strand, the strand will guarantee that all associated async operations will be executed as a strictly sequential invocation.

这是否意味着上述所有异步操作都将由同一个线程执行? 或者它只是说在任何时候,任何可用线程只会执行一个异步操作?

问题 2> boost::asio::make_strand 函数为执行程序或执行上下文创建一个链对象。

session(net::io_context& ioc, ssl::context& ctx)
    : resolver_(net::make_strand(ioc))
    , ws_(net::make_strand(ioc), ctx)
    

在这里,resolver_ws_ 有自己的链,但我无法理解每个链如何应用于异步操作。

例如,在下面的aysnc和handler中,哪些函数(即aysnc或handler)绑定在同一条链上,不会运行同时

run
  =>resolver_.async_resolve -->session::on_resolve 
    =>beast::get_lowest_layer(ws_).async_connect -->session::on_connect
      =>ws_.next_layer().async_handshake --> session::on_ssl_handshake
        =>ws_.async_handshake --> session::on_handshake

异步=================================处理程序

问题 3> 我们如何从执行者那里取回链? 这两者有什么区别吗?

get_associated_executor get_executor

io_context::get_executor: Obtains the executor associated with the io_context.

get_associated_executor: Helper function to obtain an object's associated executor.

问题 4> 我使用以下方法将 deadline_timer 绑定到 io_context 以防止竞争条件是否正确?

代码的所有其他部分与websocket_client_async_ssl.cpp的示例相同。

session(net::io_context& ioc, ssl::context& ctx)
    : resolver_(net::make_strand(ioc))
    , ws_(net::make_strand(ioc), ctx),
    d_timer_(ws_.get_executor())
{     }

void on_heartbeat_write( beast::error_code ec, std::size_t bytes_transferred)
{
  d_timer_.expires_from_now(boost::posix_time::seconds(5));
  d_timer_.async_wait(beast::bind_front_handler( &session::on_heartbeat, shared_from_this()));
}

void on_heartbeat(const boost::system::error_code& ec)
{
    ws_.async_write( net::buffer(text_ping_), beast::bind_front_handler( &session::on_heartbeat_write, shared_from_this()));
}

void on_handshake(beast::error_code ec)
{
    d_timer_.expires_from_now(boost::posix_time::seconds(5));
    d_timer_.async_wait(beast::bind_front_handler( &session::on_heartbeat, shared_from_this()));
    ws_.async_write(net::buffer(text_), beast::bind_front_handler(&session::on_write, shared_from_this()));
}

注意: 我使用 d_timer_(ws_.get_executor()) 来初始化 deadline_timer 并希望它能确保他们不会同时写入或读取 websocket。 这是正确的做法吗?

问题 1

Does this mean that all above async operations will be executed by a same thread? Or it just says that at any time, only one async operation will be executed by any available thread?

后者。

问题 2

Here, resolver_ and ws_ have its own strand,

让我插一句,我认为示例中的内容造成了不必要的混淆。他们可以(理论上应该)使用相同的链,但我猜他们不想经历存储链的麻烦。我可能会写:

explicit session(net::io_context& ioc, ssl::context& ctx)
    : resolver_(net::make_strand(ioc))
    , ws_(resolver_.get_executor(), ctx) {}

启动函数称为由您决定。完成处理程序在属于您调用操作的 IO 对象的执行程序上 dispatch-ed,除非 完成处理程序绑定到不同的执行程序(例如使用bind_executor,参见 get_associated_exectutor)。到目前为止,在现代 Asio 的大多数情况下,您将 不会 绑定处理程序,而是将“绑定 IO 对象”“绑定”到适当的执行程序。这使得输入更少,更难忘记。

所以实际上,链 中的所有 async-initiations 除了 run() 中的 都在一条链上,因为 IO 对象是绑定到 strand 执行器。

必须记住,当一些 外部 用户呼叫你的 classes (例如经常 stop)。因此,制定一个公约是个好主意。我会亲自制作所有“不安全”的方法和成员 private:,所以我经常会有像这样的对:

  public:
    void stop() {
        dispatch(strand_, [self=shared_from_this()] { self->do_stop(); });
    }

  private:
    void do_stop() {
        beast::get_lowest_layer(ws_).cancel();
    }

Side Note:

In this particular example, there is only one (main) thread running/polling the io service, so the whole point is moot. But as I explained recently (), the examples are here to show some common patterns that allow one to do "real life" work as well

奖励:处理程序跟踪

让我们使用 BOOST_ASIO_ENABLE_HANDLER_TRACKING 来获得一些见解。¹ 运行 示例会话显示类似

稍微眯一下,可以看到所有的strand executor都是一样的:

0*1|resolver@0x559785a03b68.async_resolve
1*2|strand_executor@0x559785a02c50.execute
2*3|socket@0x559785a05770.async_connect
3*4|strand_executor@0x559785a02c50.execute
4*5|socket@0x559785a05770.async_send
5*6|strand_executor@0x559785a02c50.execute
6*7|socket@0x559785a05770.async_receive
7*8|strand_executor@0x559785a02c50.execute
8*9|socket@0x559785a05770.async_send
9*10|strand_executor@0x559785a02c50.execute
10*11|socket@0x559785a05770.async_receive
11*12|strand_executor@0x559785a02c50.execute
12*13|deadline_timer@0x559785a05958.async_wait
12*14|socket@0x559785a05770.async_send
14*15|strand_executor@0x559785a02c50.execute
15*16|socket@0x559785a05770.async_receive
16*17|strand_executor@0x559785a02c50.execute
17*18|socket@0x559785a05770.async_send
13*19|strand_executor@0x559785a02c50.execute
18*20|strand_executor@0x559785a02c50.execute
20*21|socket@0x559785a05770.async_receive
21*22|strand_executor@0x559785a02c50.execute
22*23|deadline_timer@0x559785a05958.async_wait
22*24|socket@0x559785a05770.async_send
24*25|strand_executor@0x559785a02c50.execute
25*26|socket@0x559785a05770.async_receive
26*27|strand_executor@0x559785a02c50.execute
23*28|strand_executor@0x559785a02c50.execute

问题 3

How can we retrieve the strand from executor?

你没有[*]。但是 make_strand(s) returns 如果 s 已经是一条链,则为等效链。

[*] 默认情况下,Asio 的 IO 对象使用 type-erased 执行器(asio::executorasio::any_io_executor 取决于版本)。所以从技术上讲,你 可以 询问它的 target_type() ,并且在将类型 id 与某些预期类型进行比较后,使用 target<net::strand<net::io_context::executor_type>>() 之类的东西来访问原始类型,但实际上没用。您不想检查实施细节。只需尊重处理程序(通过像 Asio 那样将它们分派给相关的执行程序)。

Is there any difference between these two? get_associated_executor get_executor

get_executor 从 IO 对象获取一个拥有的执行器。它是一个成员函数。

asio::get_associated_executor 从处理程序对象中获取关联的执行程序。您会观察到 get_associated_executor(ws_) 无法编译(尽管某些 IO 对象可能 satisfy the criteria 允许它工作)。

问题 4

Is it correct that I use the following method to bind deadline_timer to io_context

您会注意到您做了与我上面已经提到的相同的操作,将定时器 IO 对象绑定到同一个链执行器。所以,荣誉。

to prevent race condition?

您在这里没有阻止竞争条件。您可以防止 数据竞争 。那是因为在 on_heartbeat 中,您 访问 ws_ 对象,它是 class 的一个实例,它不是线程安全的。实际上,您正在共享对 non-threadsafe 资源的访问权限,并且您需要 序列化 访问权限,因此您希望处于所有其他访问权限也处于开启状态的链上。

Note: [...] and hoped that it will make sure they don't write or read the websocket at the same time. Is this the right way to do it?

是的,这是一个好的开始,但还不够。

首先,您可以同时写入或读取,只要

  • 写入操作不重叠
  • 读取操作不重叠
  • 对 IO 对象的访问已安全序列化。

特别是,您的 on_heartbeat 可能已安全序列化,因此您不会在调用 async_write 启动函数时发生数据争用。但是,您需要进行更多检查以了解写入操作是否已经(仍在)进行中。实现这一目标的一种方法是拥有一个包含传出消息的队列。如果你对心跳要求严格,负载高,你可能需要一个priority-queue这里。


¹ 我通过将流类型替换为 Asio 本机 ssl::stream<tcp::socket> 来简化示例。这意味着我们没有得到处理 tcp_stream 过期的所有内部计时器。参见 https://pastebin.ubuntu.com/p/sPRYh6Xbwz/