boost::asio::bind_executor 不在链中执行

boost::asio::bind_executor does not execute in strand

以下示例在没有断言的情况下完成:

#include <cassert>
#include <functional>
#include <future>
#include <thread>
#include <boost/asio.hpp>

class example1
{
public:
    typedef boost::asio::io_context io_context;
    typedef boost::asio::io_context::executor_type executor_type;
    typedef boost::asio::strand<executor_type> strand;
    typedef boost::asio::executor_work_guard<executor_type> work_guard;
    typedef std::function<void()> handler;

    example1()
      : work_(boost::asio::make_work_guard(context_)),
        thread_([this]() { context_.run(); }),
        strand1_(context_.get_executor()),
        strand2_(context_.get_executor())
    {

    }

    ~example1()
    {
        assert(result_.get_future().get());
        work_.reset();
        thread_.join();
    }

    void invoke()
    {
        handler handle = boost::asio::bind_executor(strand2_,
            std::bind(&example1::strand2_handler, this));

        boost::asio::post(strand1_,
            std::bind(&example1::strand1_handler, this, handle));
    }

    void strand1_handler(handler handle)
    {
        assert(strand1_.running_in_this_thread());
        handle();
    }

    void strand2_handler()
    {
        assert(strand1_.running_in_this_thread());
        ////assert(strand2_.running_in_this_thread());
        result_.set_value(true);
    }

private:
    io_context context_;
    work_guard work_;
    std::thread thread_;
    strand strand1_;
    strand strand2_;
    std::promise<bool> result_;
};

int main()
{
    example1 test{};
    test.invoke();
}

但是我的期望是注释掉的断言应该成功,而不是直接在它上面的断言。根据 strand::running_in_this_thread(),处理程序 handle 已在调用者的链中被调用,而不是提供给 bind_executor

我可以使用中间方法解决这个问题,如下所示。

class example2
{
public:
    typedef boost::asio::io_context io_context;
    typedef boost::asio::io_context::executor_type executor_type;
    typedef boost::asio::strand<executor_type> strand;
    typedef boost::asio::executor_work_guard<executor_type> work_guard;
    typedef std::function<void()> handler;

    example2()
      : work_(boost::asio::make_work_guard(context_)),
        thread_([this]() { context_.run(); }),
        strand1_(context_.get_executor()),
        strand2_(context_.get_executor())
    {

    }

    ~example2()
    {
        assert(result_.get_future().get());
        work_.reset();
        thread_.join();
    }

    void invoke()
    {
        handler handle =
            std::bind(&example2::do_strand2_handler, this);

        boost::asio::post(strand1_,
            std::bind(&example2::strand1_handler, this, handle));
    }

    void strand1_handler(handler handle)
    {
        assert(strand1_.running_in_this_thread());
        handle();
    }

    // Do the job of bind_executor.
    void do_strand2_handler()
    {
        boost::asio::post(strand2_,
            std::bind(&example2::strand2_handler, this));
    }

    void strand2_handler()
    {
        ////assert(strand1_.running_in_this_thread());
        assert(strand2_.running_in_this_thread());
        result_.set_value(true);
    }

private:
    io_context context_;
    work_guard work_;
    std::thread thread_;
    strand strand1_;
    strand strand2_;
    std::promise<bool> result_;
};

int main()
{
    example2 test2{};
    test2.invoke();
}

但避免这种情况大概是 bind_executor。这是一个提升错误还是我遗漏了什么?我已尝试通过 boost::asio 来源进行跟踪,但无济于事。

更新

感谢@sehe 的帮助。上述问题可以通过多种方式解决,例如:

class example3
{
public:
    typedef boost::asio::io_context io_context;
    typedef boost::asio::io_context::executor_type executor_type;
    typedef boost::asio::strand<executor_type> strand;
    typedef boost::asio::executor_work_guard<executor_type> work_guard;
    typedef boost::asio::executor_binder<std::function<void()>,
        boost::asio::any_io_executor> handler;

    example3()
      : work_(boost::asio::make_work_guard(context_)),
        thread_([this]() { context_.run(); }),
        strand1_(context_.get_executor()),
        strand2_(context_.get_executor())
    {
    }

    ~example3()
    {
        assert(result_.get_future().get());
        work_.reset();
        thread_.join();
    }

    void invoke()
    {
        auto handle = boost::asio::bind_executor(strand2_,
            std::bind(&example3::strand2_handler, this));

        boost::asio::post(strand1_,
            std::bind(&example3::strand1_handler, this, handle));
    }

    void strand1_handler(handler handle)
    {
        assert(strand1_.running_in_this_thread());
        boost::asio::dispatch(handle);
    }

    void strand2_handler()
    {
        assert(strand2_.running_in_this_thread());
        result_.set_value(true);
    }

private:
    io_context context_;
    work_guard work_;
    std::thread thread_;
    strand strand1_;
    strand strand2_;
    std::promise<bool> result_;
};

int main
{
    example3 test3{};
    test3.invoke();
}

是的,您确实漏掉了一些东西。实际上有两件事。

类型擦除

绑定执行器不会修改函数,它会修改其类型。

但是,通过使用 std::function<> 擦除可调用对象的类型,您隐藏了绑定的执行程序。你可以很容易地确定这一点:

erased_handler handle = bind_executor(s2, s2_handler);
assert(asio::get_associated_executor(handle, s1) == s1);

保留类型后问题就消失了:

auto handle = bind_executor(s2, s2_handler);
assert(asio::get_associated_executor(handle, s1) == s2);

派遣(原 handler_invoke

调用 handle 直接根据 C++ 语言语义调用它,如您所见。

要让 Asio 尊重潜在绑定的执行者,您可以使用 dispatch(或 post):

auto s1_handler = [&](auto chain) {
    assert(s1.running_in_this_thread());
    dispatch(get_associated_executor(chain, s1), chain);
};

事实上,如果您确定 chain 将有一个关联的执行程序,您可以接受默认回退(这是一个系统执行程序):

auto s1_handler = [&](auto chain) {
    assert(s1.running_in_this_thread());
    dispatch(chain);
};

把它们放在一起

在简化的扩展测试器中展示智慧:

Live On Coliru

#include <boost/asio.hpp>
#include <functional>
#include <iostream>

namespace asio = boost::asio;

int main() {
    asio::thread_pool io(1);

    auto s1 = make_strand(io), s2 = make_strand(io);
    assert(s1 != s2); // implementation defined! strands may hash equal

    auto s1_handler = [&](auto chain) {
        assert(s1.running_in_this_thread());

        // immediate invocation runs on the current strand:
        chain();

        // dispatch *might* invoke directly if already on the right strand
        dispatch(chain);                                     // 1
        dispatch(get_associated_executor(chain, s1), chain); // 2

        // posting never immediately invokes, even if already on the right
        // strand
        post(chain);                                     // 3
        post(get_associated_executor(chain, s1), chain); // 4
    };

    int count_chain_invocations = 0;
    auto s2_handler = [&] {
        if (s2.running_in_this_thread()) {
            count_chain_invocations += 1;
        } else {
            std::cout << "(note: direct C++ call ends up on wrong strand)\n";
        }
    };

    {
        using erased_handler  = std::function<void()>;
        erased_handler handle = bind_executor(s2, s2_handler);
        assert(asio::get_associated_executor(handle, s1) == s1);
    }
    {
        auto handle = bind_executor(s2, s2_handler);
        assert(asio::get_associated_executor(handle, s1) == s2);
    }

    auto handle = bind_executor(s2, s2_handler);
    post(s1, std::bind(s1_handler, handle));

    io.join();

    std::cout << "count_chain_invocations: " << count_chain_invocations << "\n";
}

所有断言均通过,输出符合预期:

(note: direct C++ call ends up on wrong strand)
count_chain_invocations: 4

奖励:如果您需要 Type-Erased 绑定可调用对象怎么办?

无论您做什么,都不要使用 std::function。不过你可以包一个;

template <typename Sig> struct ErasedHandler {
    using executor_type = asio::any_io_executor;
    std::function<Sig> _erased;
    executor_type      _ex;
    executor_type get_executor() const { return _ex; }

    template <typename F>
    explicit ErasedHandler(F&& f)
        : _erased(std::forward<F>(f))
        , _ex(asio::get_associated_executor(f)) {}

    ErasedHandler() = default;

    template <typename... Args>
    decltype(auto) operator()(Args&&... args) const {
        return _erased(std::forward<Args>(args)...);
    }
    template <typename... Args>
    decltype(auto) operator()(Args&&... args) {
        return _erased(std::forward<Args>(args)...);
    }

    explicit operator bool() const { return _erased; }
};

看到了Live On Coliru

在你这样做之前,请注意

  • 使用 any_io_executor 还会键入擦除执行程序,这可能会损害性能
  • 提供了一个很好的回退,只是使用系统执行器来处理未绑定的可调用对象。您可以通过检测它并要求显式构造函数参数等来解决这个问题,但是...
  • 所有这些仍然完全忽略了其他处理程序属性,例如 associated allocator

我可能会避免一般地存储 type-erased 可链接的处理程序。您通常可以存储由模板类型参数推导的处理程序的实际类型。

PS:事后思考

您可能期望的是这种行为:

template <typename... Args>
decltype(auto) operator()(Args&&... args) const {
    // CAUTION: NOT WHAT YOU WANT
    boost::asio::dispatch(_ex,
                          std::bind(_erased, std::forward<Args>(args)...));
}
template <typename... Args>
decltype(auto) operator()(Args&&... args) {
    // CAUTION: NOT WHAT YOU WANT
    boost::asio::dispatch(_ex,
                          std::bind(_erased, std::forward<Args>(args)...));
}

看到Live On Coliru

在此方案下,即使是直接 C++ 调用也会“做正确的事”。

看起来不错。等你想好了。[​​=31=]

问题是无法以这种方式重新启动处理程序。更具体地说,如果您有一个与“free-threaded”执行程序关联的处理程序,则执行 bind_executor(strand, f) 将没有任何效果(除了减慢您的程序),因为 f 会令人讨厌无论如何都要分派给另一个执行者。

所以不要那样做:)