是否可以使用异步组合器创建 boost::signal2?

Is it possible to create boost::signal2 with asynchronous combiner?

对于一个项目,我尝试创建异步升压信号,它似乎有效,但 valgrind 告诉我相反。

在下面的示例中,您可以看到基本的实现和用法。

对于这个例子,我需要一个异步信号,因为信号是在 SET 函数中触发的,它锁定了互斥量,而槽试图调用 GET,它也锁定了互斥量。是的,我可以在信号调用之前调用 mutex.unlock(),但从我的项目来看,它有点复杂,因为我不想冒险阻塞更新数据的过程,这些过程可能会很慢。

那么,是否可以使用 boost 创建异步信号?如果是这样,有人可以让我在没有 valgrind 错误的情况下工作吗?

我试着看一下 boost 源代码,但我不知道如何解决我的问题。我的lambda,在Combiner里,一个副本一个iterator,但是valgrind不够用。

我尽量让示例尽可能小,但是 valgrind 错误很大,抱歉。

我正在使用:

// test_signals.cpp
// Compilation : g++ -std=gnu++17 -o test_signal test_signals.cpp -O0 -g -pthread
// Valgrind : valgrind --trace-children=yes --leak-check=full --track-origins=yes --log-file=valgrind.log ./test_signal
#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/signals2/signal.hpp>
#include <iostream>
#include <mutex>

using ThreadPool = std::shared_ptr<::boost::asio::thread_pool>;

struct AsyncSignalCombiner
{
    typedef void result_type;

    AsyncSignalCombiner(ThreadPool pool)
        : thread_pool(pool){};
    AsyncSignalCombiner(const AsyncSignalCombiner &) = default;

    template <typename InputIterator>
    result_type operator()(InputIterator first, InputIterator last) const
    {
        while (first != last)
        {
            ::boost::asio::post(*thread_pool, [=]() { *first; });

            ++first;
        }
    }

    ThreadPool thread_pool{nullptr};
};

class SignalASync : public boost::signals2::signal<void(), AsyncSignalCombiner>
{
public:
    explicit SignalASync(ThreadPool thread_pool)
        : boost::signals2::signal<void(), AsyncSignalCombiner>(
              AsyncSignalCombiner{thread_pool}){};
};

class A
{
    std::mutex mutex_;

public:
    A(ThreadPool pool)
        : changed_{pool} {};
    int get()
    {
        std::lock_guard lock{mutex_};
        return 42;
    }

    void set()
    {
        std::lock_guard lock{mutex_};
        changed_();
    }

    SignalASync changed_;
};

int main()
{
    auto pool = std::make_shared<boost::asio::thread_pool>(1);
    A data{pool};

    auto slot = [&]() { std::cout << "slot: " << data.get() << std::endl; };

    data.changed_.connect(slot);

    data.set();

    pool->join();

    return 0;
}

还有 valgrind 的错误:(对不起,我只在 5 上放了一个错误,因为它们对 Whosebug 来说太大了,但它们都是一样的,无效 read/write on cache->result,slot_call_iterator.cpp 第 107、110 和 119 行)

==69966== Thread 2:
==69966== Invalid read of size 1
==69966==    at 0x126B6A: boost::optional_detail::optional_base<boost::signals2::detail::void_type>::is_initialized() const (optional.hpp:396)
==69966==    by 0x1251EF: boost::optional<boost::signals2::detail::void_type>::operator!() const (optional.hpp:1446)
==69966==    by 0x123759: boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >::dereference() const (slot_call_iterator.hpp:107)
==69966==    by 0x121DEB: boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >::reference boost::iterators::iterator_core_access::dereference<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >(boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > const&) (iterator_facade.hpp:550)
==69966==    by 0x11FCC7: boost::iterators::detail::iterator_facade_base<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >, boost::signals2::detail::void_type, boost::iterators::single_pass_traversal_tag, boost::signals2::detail::void_type const&, long, false, false>::operator*() const (iterator_facade.hpp:656)
==69966==    by 0x11DFE6: void AsyncSignalCombiner::operator()<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >(boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >, boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >) const::{lambda()#1}::operator()() const (test_signals.cpp:25)
==69966==    by 0x1297DE: void boost::asio::asio_handler_invoke<void AsyncSignalCombiner::operator()<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >(boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >, boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >) const::{lambda()#1}>(void AsyncSignalCombiner::operator()<boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >(boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >, boost::signals2::detail::slot_call_iterator_t<boost::signals2::detail::variadic_slot_invoker<boost::signals2::detail::void_type>, std::_List_iterator<boost::shared_ptr<boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> > >, boost::signals2::detail::connection_body<std::pair<boost::signals2::detail::slot_meta_group, boost::optional<int> >, boost::signals2::slot<void (), boost::function<void ()> >, boost::signals2::mutex> >) const::{lambda()#1}&, ...) (handler_invoke_hook.hpp:69)
[...]
==69966==  Address 0x1ffefffb30 is on thread 1's stack
==69966==  144 bytes below stack pointer

感谢您阅读我的文章,可能需要时间来回答我,这是我第一次 post 在这里。

编辑 20/01/21 : 问题来自 slot_call_iterator_t(在 boost/signals2/detail/slot_call_iterator.hpp 中),他们将“cache_type”引用但按地址 (cache_type*) 存储它,因此当将 slot_call_iterator_t 的副本提供给线程 2 时,cache_type* 指的是线程 1 上的堆栈地址。

但我暂时不知道如何解决它。

嗯,代码应该做什么?

*first;

这个语句在概念上没有做任何事情。它取消引用迭代器。回调结果的迭代器。

认为你正试图异步调度信号,但你在组合器中为时已晚。迭代器的取消引用 封装了 执行,但迭代器在组合器的范围之外无效。

所以这行不通。

一些直接的答案

从技术上回答你关于如何修复悬空引用的问题:

while (first != last) {
    auto value = *first++;
    post(*thread_pool, [=] { value; });
}

显然,这只是反驳了你的组合器的目的,因为现在评估是在信号线程上进行的。

旁注:执行者

使用执行程序而不是绕过线程池,这样您就可以抽象出执行上下文的类型。下面的例子应该清楚我的意思。

可能的解决方案#1

您可以拦截连接,而不是拦截执行:

Live On Wandbox

// test_signals.cpp
// Compilation : g++ -std=gnu++17 -o test_signal test_signals.cpp -O0 -g
// -pthread Valgrind :
// valgrind --trace-children=yes --leak-check=full --track-origins=yes --log-file=valgrind.log ./test_signal
#include <boost/signals2/signal.hpp>
#include <iostream>
#include <mutex>
#include <utility>

#include <boost/asio.hpp>

template <typename Ex>
struct SignalASync : protected boost::signals2::signal<void()> {
    using Base = boost::signals2::signal<void()>;

    template <typename... Args>
    SignalASync(Ex ex, Args&&... args)
     : Base(std::forward<Args>(args)...), _ex(ex) {}

    using Base::operator=;
    using Base::operator();

    template <typename F, typename... Args>
    auto connect(F&& f, Args&&... other) {
        return Base::connect(
            Binder<F>(_ex, std::forward<F>(f)),
            std::forward<Args>(other)...);
    }

  private:
    template <typename F> struct Binder {
        Binder(Ex ex, F&& f) : _ex(ex), _f(std::forward<F>(f)) {}

        template <typename... Args>
        void operator()(Args&&... args) const {
            auto argtup = std::make_tuple(std::forward<Args>(args)...);
            post(_ex, [=] { std::apply(_f, argtup); });
        }

        Ex _ex;
        F _f;
    };

    Ex _ex;
};

template <typename Ex>
class A {
    std::mutex mutex_;

  public:
    A(Ex ex) : changed_{ ex } {}

    int get() {
        std::lock_guard lock{ mutex_ };
        return 42;
    }

    void set() {
        std::lock_guard lock{ mutex_ };
        changed_();
    }

    SignalASync<Ex> changed_;
};

int main() {
    boost::asio::thread_pool pool(2);
    A data{ pool.get_executor() };

    auto slot = [&] { std::cout << "thread_id: "  << std::this_thread::get_id() << " slot: " << data.get() << std::endl; };

    std::cout << "main thread id: "  << std::this_thread::get_id() << std::endl;

    data.changed_.connect(slot);

    data.set();

    pool.join();
}

打印

main thread id: 139941975648128
thread_id: 139941863012096 slot: 42

并在 valgrind 和 ubsan/asan 下干净地运行。

更多建议

  1. 也许你可以改变一些东西,这样你就不需要 Executor 模板参数,通过使用 boost::asio::any_io_executor: Live On Wandbox

    请注意,这可能效率较低。

  2. 此外,您可能会完全取消 Signal 子类,因为它需要您提供更多接口(如 operator+= 等)。相反,只需创建一个包装函数:

    Live On Wandbox 46行代码!

    // test_signals.cpp
    // Compilation : g++ -std=gnu++17 -o test_signal test_signals.cpp -O0 -g
    // -pthread Valgrind :
    // valgrind --trace-children=yes --leak-check=full --track-origins=yes --log-file=valgrind.log ./test_signal
    #include <boost/signals2/signal.hpp>
    #include <iostream>
    #include <mutex>
    #include <utility>
    
    #include <boost/asio.hpp>
    
    using Signal = boost::signals2::signal<void()>;
    
    class A {
        std::mutex mutex_;
    
      public:
        int get() {
            std::lock_guard lock{ mutex_ };
            return 42;
        }
    
        void set() {
            std::lock_guard lock{ mutex_ };
            changed_();
        }
    
        Signal changed_;
    };
    
    int main() {
        boost::asio::thread_pool pool(2);
        auto on_pool = [ex = pool.get_executor()](auto f) { 
            return [ex,f] { post(ex, f); };
        };
    
        A data;
        auto slot = [&] { std::cout << "thread_id: "  << std::this_thread::get_id() << " slot: " << data.get() << std::endl; };
    
        std::cout << "main thread id: "  << std::this_thread::get_id() << std::endl;
    
        data.changed_.connect(on_pool(slot));
        data.set();
    
        pool.join();
    }
    

    版画

    main thread id: 139851977009024
    thread_id: 139851872274176 slot: 42
    
  3. 如果你不喜欢这样,因为很容易忘记用 on_pool 换行,请将换行移到 A 中。将 change_ 设为私有,而不是添加 on_change 方法:

    Live On Wandbox

    As A Bonus, this also makes the signal less racy, by passing the actual value directly to the signal handler.

    // test_signals.cpp
    // Compilation : g++ -std=gnu++17 -o test_signal test_signals.cpp -O0 -g
    // -pthread Valgrind :
    // valgrind --trace-children=yes --leak-check=full --track-origins=yes --log-file=valgrind.log ./test_signal
    #include <boost/signals2/signal.hpp>
    #include <iostream>
    #include <mutex>
    #include <utility>
    #include <boost/asio.hpp>
    
    using Signal = boost::signals2::signal<void(int)>;
    
    class A {
        mutable std::mutex mutex_;
        boost::asio::executor ex_;
        Signal changed_;
        int value_ = 42;
    
      public:
        A(boost::asio::executor ex) : ex_(ex) {}
    
        int get() const {
            std::lock_guard lock{ mutex_ };
            return value_;
        }
    
        void set(int value) {
            {
                std::lock_guard lock{ mutex_ };
                value_ = value;
            }
            changed_(value);
        }
    
        boost::signals2::connection on_changed(Signal::slot_type const& f) {
            return changed_.connect([=](auto&&... args) {
                post(ex_, std::bind(f, std::forward<decltype(args)>(args)...));
            });
        }
    };
    
    int main() {
        boost::asio::thread_pool pool(2);
        A data { pool.get_executor() };
    
        std::cout << "main thread id: "  << std::this_thread::get_id() << std::endl;
    
        data.on_changed([&](int v) { std::cout << " slot: " << v << std::endl; });
        data.set(99);
    
        pool.join();
    }
    

    版画

    main thread id: 140379793893248
     slot: 99