为什么在我的代码中并行处理异步 boost::asio 操作时消毒剂会发出警告?

Why does sanitizer give warnings when processing asynchronous boost::asio operations in parallel in my code?

我决定测试我使用 boost::asio 编写的项目(我在不同的线程中 运行 io_service::run)和各种消毒剂,并且在线程消毒剂上有数据竞争,即它报告说套接字同时在 io_service 内某处检查 is_open 并且由于超时而同时关闭。我设法在小代码上重复了这个问题,但我不明白为什么会这样,它不应该使用 boost::asio::io_service::strand 构建链并同步 class D(来自下面的例子)相对于其他线程?请帮忙,谢谢!

#include <boost/asio.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/strand.hpp>
#include <boost/thread.hpp>
#include <boost/asio/write.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <thread>

using namespace boost::asio;
io_service service;
struct D {
    explicit D(io_service & ios) : socket_(ios), timeout_(ios), strand_(ios), resolver_(ios) {
        do_response();
    }
private:
    void do_response(){
        resolver_.async_resolve("tracker.dler.com",  "6969",
                                strand_.wrap( [this](boost::system::error_code const & ec,
                                                      ip::tcp::resolver::iterator endpoints){
                                    if (!ec) {
                                        do_connect(std::move(endpoints));
                                        timeout_.async_wait([this](boost::system::error_code const & ec) {
                                            if (!ec) {
                                                deadline();
                                            }
                                        });
                                    } else {
                                        socket_.close();
                                        std::cerr << "ec == true" << std::endl;
                                    }
                                }));
    }
    void do_connect(ip::tcp::resolver::iterator endpoint){
        async_connect(socket_, std::move(endpoint),
                          strand_.wrap([this](boost::system::error_code const & ec, [[maybe_unused]] const ip::tcp::resolver::iterator&){
                              std::this_thread::sleep_for(std::chrono::milliseconds(100));
                              if (!ec) {
                                  std::cout << "OK!" << std::endl;
                                  timeout_.cancel();
                              }
                          }));
        timeout_.expires_from_now(boost::posix_time::seconds(1));
    }
    void deadline(){
        if (timeout_.expires_at() <=  deadline_timer::traits_type::now()) {
            socket_.close();
            std::cerr << "TIMEOUT" << std::endl;
        } else {
            timeout_.async_wait(strand_.wrap([this](boost::system::error_code const &ec) {
                if (!ec) {
                    deadline();
                }
            }));
        }
    }

    ip::tcp::socket socket_;
    deadline_timer timeout_;
    io_service::strand strand_;
    ip::tcp::resolver resolver_;
};
void func(int i)
{
    std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl;
}
void worker_thread()
{
    service.run();
}
int main(int argc, char* argv[])
{
    std::list<D> ds;
    for ( int i = 0; i < 15; ++i) {
        ds.emplace_back(service);
    }
    boost::thread_group threads;
    for ( int i = 0; i < 3; ++i)
        threads.create_thread(worker_thread);
    // wait for all threads to be created
    boost::this_thread::sleep( boost::posix_time::millisec(500));
    threads.join_all();
}

消毒剂输出: from is_open from deadline

reactive_socket_service_base.hpp
Data race (pid=7375)
Read of size 4 at 0x7b34000001b8 by thread T2:
    0x415147 boost::asio::detail::reactive_socket_service_base::is_open const reactive_socket_service_base.hpp:89
    0x437f3c boost::asio::basic_socket::is_open const basic_socket.hpp:489
    0x435c0b boost::asio::detail::iterator_connect_op::operator() connect.hpp:550
    0x445036 boost::asio::detail::binder1::operator() bind_handler.hpp:65
    0x449677 boost::asio::detail::rewrapped_handler::operator() wrapped_handler.hpp:191
    0x449564 (cpptorrent+0x449563) handler_invoke_hook.hpp:69
    0x449378 boost_asio_handler_invoke_helpers::invoke<…> handler_invoke_helpers.hpp:37
    0x448f5f boost::asio::detail::asio_handler_invoke<…> wrapped_handler.hpp:275
    0x448c35 boost_asio_handler_invoke_helpers::invoke<…> handler_invoke_helpers.hpp:37
    0x449318 boost::asio::detail::handler_work::complete<…> handler_work.hpp:100
    0x448e9e boost::asio::detail::completion_handler::do_complete completion_handler.hpp:70
    0x4489ae boost::asio::detail::strand_service::dispatch<…> strand_service.hpp:88
    0x448349 boost::asio::io_context::strand::initiate_dispatch::operator()<…> const io_context_strand.hpp:343
    0x447e0f boost::asio::async_result::initiate<…> async_result.hpp:82
    0x44728e boost::asio::async_initiate<…> async_result.hpp:257
    0x4468c4 boost::asio::io_context::strand::dispatch<…> io_context_strand.hpp:189
    0x44583d boost::asio::detail::asio_handler_invoke<…> wrapped_handler.hpp:232
    0x444f7e boost_asio_handler_invoke_helpers::invoke<…> handler_invoke_helpers.hpp:37
    0x4443d0 boost::asio::detail::asio_handler_invoke<…> connect.hpp:612
    0x443f36 boost_asio_handler_invoke_helpers::invoke<…> handler_invoke_helpers.hpp:37
    0x443910 boost::asio::detail::asio_handler_invoke<…> bind_handler.hpp:106
    0x4430de boost_asio_handler_invoke_helpers::invoke<…> handler_invoke_helpers.hpp:37
    0x4426d6 boost::asio::detail::io_object_executor::dispatch<…> const io_object_executor.hpp:119
    0x441148 boost::asio::detail::handler_work::complete<…> handler_work.hpp:72
    0x43fbd6 boost::asio::detail::reactive_socket_connect_op::do_complete reactive_socket_connect_op.hpp:102
    0x40b513 boost::asio::detail::scheduler_operation::complete scheduler_operation.hpp:40
    0x412d5f boost::asio::detail::scheduler::do_run_one scheduler.ipp:447
    0x412543 boost::asio::detail::scheduler::run scheduler.ipp:200
    0x413581 boost::asio::io_context::run io_context.ipp:63
    0x4087ee worker_thread main.cpp:82
    0x44af46 boost::detail::thread_data::run thread.hpp:120
    0x44ef6c thread_proxy
Previous write of size 4 at 0x7b34000001b8 by thread T3:
    0x41520e boost::asio::detail::reactive_socket_service_base::construct reactive_socket_service_base.ipp:45
    0x4154d8 boost::asio::detail::reactive_socket_service_base::close reactive_socket_service_base.ipp:127
    0x4222f9 boost::asio::basic_socket::close basic_socket.hpp:507
    0x41bca1 D::deadline main.cpp:62
    0x41b8b9 <lambda#1>::operator() const main.cpp:39
    0x441d16 boost::asio::detail::binder1::operator() bind_handler.hpp:65
    0x44039a (cpptorrent+0x440399) handler_invoke_hook.hpp:69
    0x43ef12 boost_asio_handler_invoke_helpers::invoke<…> handler_invoke_helpers.hpp:37
    0x43dbbd boost::asio::detail::asio_handler_invoke<…> bind_handler.hpp:106
    0x43cb52 boost_asio_handler_invoke_helpers::invoke<…> handler_invoke_helpers.hpp:37
    0x43bc18 boost::asio::detail::io_object_executor::dispatch<…> const io_object_executor.hpp:119
    0x43a854 boost::asio::detail::handler_work::complete<…> handler_work.hpp:72
    0x439206 boost::asio::detail::wait_handler::do_complete wait_handler.hpp:73
    0x40b513 boost::asio::detail::scheduler_operation::complete scheduler_operation.hpp:40
    0x412d5f boost::asio::detail::scheduler::do_run_one scheduler.ipp:447
    0x412543 boost::asio::detail::scheduler::run scheduler.ipp:200
    0x413581 boost::asio::io_context::run io_context.ipp:63
    0x4087ee worker_thread main.cpp:82
    0x44af46 boost::detail::thread_data::run thread.hpp:120
    0x44ef6c thread_proxy
Location is heap block of size 208 at 0x7b34000001a0 allocated by main thread:
    0x7f0da81022af operator new
    0x4381e1 __gnu_cxx::new_allocator::allocate new_allocator.h:115
    0x435fa4 std::allocator_traits::allocate alloc_traits.h:460
    0x434014 std::_List_base::_M_get_node stl_list.h:442
    0x42fb5f std::list::_M_create_node<…> stl_list.h:634
    0x429629 std::list::_M_insert<…> stl_list.h:1911
    0x422d5b std::list::emplace_back<…> stl_list.h:1227
    0x40885d main main.cpp:88
Thread T2 (tid=7380, running) created by main thread at:
    0x7f0da80d5466 pthread_create
    0x44e0ca boost::thread::start_thread_noexcept
    0x429715 boost::thread::thread<…> thread.hpp:269
    0x422de0 boost::thread_group::create_thread<…> thread_group.hpp:79
    0x408893 main main.cpp:92
Thread T3 (tid=7381, running) created by main thread at:
    0x7f0da80d5466 pthread_create
    0x44e0ca boost::thread::start_thread_noexcept
    0x429715 boost::thread::thread<…> thread.hpp:269
    0x422de0 boost::thread_group::create_thread<…> thread_group.hpp:79
    0x408893 main main.cpp:92
Data race (pid=7375)
reactive_socket_service_base.ipp

我无法重现问题。我已经查看了代码。

  • 你的问题中有一些死代码 (f)。
  • 您没有使用最新的界面,请考虑使用 io_context 而不是已弃用的 io_service 界面。
  • 通过直接在 strand 执行器上而不是在通用服务上构建 IO 对象,您可以做得更多 safe/succinct。这意味着默认情况下所有完成都将发生在 strand 上。
  • 等待“创建所有线程”的 500 毫秒没有任何作用,因为 thread_group 上的 join_all 已经阻塞,直到服务退出(用完工作)。
  • 考虑使用 asio::thread_pool 而不是手动 io_context + thread_group。它归结为相同但更正确(参见例如 Should the exception thrown by boost::asio::io_service::run() be caught?
  • 您从未在第一次 async_wait 之前设置过期时间。链接新的 async_wait.
  • 时也不要重置它
  • 你辛苦了socket.close()。我建议改用 cancel() 并将关闭留给析构函数。这防止了围绕重用套接字 fds 的一类错误(它们甚至无法检测为数据竞争,因为它们是应用程序级竞争)。
  • 一开始我不太确定为什么中止的计时器会重新启动。难道你不应该等到尝试一个需要超时的新套接字操作吗?
  • 在取消计时器之前等待 100 毫秒似乎很漂亮 counter-productive。这样就可以关闭套接字,即使连接 did 及时完成? [也许这只是因为示例代码删除了太多其他代码。]

话虽如此,这里是一个简化的清单。我没有在您的代码中看到数据竞争。你加的时候我会看报告的。

现在,希望这篇评论能对您有所帮助:

#include <boost/asio.hpp>
#include <iostream>
#include <thread>
#include <list>

using namespace std::chrono_literals;
using boost::asio::ip::tcp;
using boost::system::error_code;
using Timer = boost::asio::steady_timer;
using std::this_thread::sleep_for;

struct D {
    template <typename Executor>
    explicit D(Executor ex) : socket_(ex)
                            , timeout_(ex)
                            , resolver_(ex)
    {
        // Post here is actually redundant because during construction, nothing
        // else will have a reference to our IO objects, but for
        // consistency/documentation value:
        post(socket_.get_executor(), [this] { do_response(); });

        // All other methods will be on the strand by definition, because
        // they're all invoked from completion handlers from our IO objects
    }

  private:
    void do_response()
    {
        resolver_.async_resolve(
            "tracker.dler.com", "6969",
            [this](error_code ec, tcp::resolver::iterator endpoints) {
                if (!ec) {
                    do_connect(std::move(endpoints));
                } else {
                    socket_.close();
                    std::cerr << "ec == true" << std::endl;
                }
            });
    }

    void do_connect(tcp::resolver::iterator endpoint)
    {
        timeout_.expires_from_now(1s);
        timeout_.async_wait([this](error_code ec) { if (!ec) { deadline(); } });
        async_connect( //
            socket_, std::move(endpoint),
            [this](error_code ec, const tcp::resolver::iterator&) {
                std::cout << "connect: " << ec.message() << " (" << socket_.remote_endpoint() << ")" << std::endl;
                timeout_.cancel();
                if (!ec) {
                    sleep_for(100ms);
                }
            });
    }
    
    void deadline()
    {
        if (timeout_.expires_at() <= std::chrono::steady_clock::now()) {
            std::cerr << "TIMEOUT" << std::endl;
            socket_.cancel();
        }
    }

    tcp::socket   socket_;
    Timer         timeout_;
    tcp::resolver resolver_;
};

int main()
{
    boost::asio::thread_pool io(3);

    std::list<D> ds;
    for (int i = 0; i < 15; ++i) {
        ds.emplace_back(make_strand(io/*.get_executor()*/));
    }

    io.join();
}