ASIO:使用多条线和一条线时出现死锁 io_context

ASIO: getting deadlock when using several strands and threads with one io_context

我正在 Windows 最新版本的 ASIO 上使用 Mingw64 进行编译。

我有一个用于接受 tcp 连接的沙箱代码。我使用一个上下文,每个接受器一个链,一个套接字和 2 个线程(我在文档中读到 post 进入两个不同的链并不能保证并发调用)。出于某种原因,我在执行结束时陷入僵局,我不知道为什么会这样。如果出现以下情况则不会发生:

如果我 close acceptor 也不会发生死锁。

我未能在文档中找到任何可能解释此类行为原因的相关信息。而且我不想忽略它,因为它可能会导致不可预测的问题。

这是我的沙箱代码:


#include <asio.hpp>
#include <iostream>
#include <sstream>
#include <functional>

constexpr const char localhost[] = "127.0.0.1";
constexpr unsigned short port = 12000;

void runContext(asio::io_context &io_context)
{
    std::string threadId{};
    std::stringstream ss;
    ss << std::this_thread::get_id();
    ss >> threadId;
    std::cout << std::string("New thread for asio context: ")
                 + threadId + "\n";
    std::cout.flush();

    io_context.run();

    std::cout << std::string("Stopping thread: ")
                 + threadId + "\n";
    std::cout.flush();
};

class server
{
public:

    template<typename Executor>
    explicit server(Executor &executor)
            : acceptor_(executor)
    {
        using asio::ip::tcp;
        auto endpoint = tcp::endpoint(asio::ip::make_address_v4(localhost),
                                      port);
        acceptor_.open(endpoint.protocol());
        acceptor_.set_option(tcp::acceptor::reuse_address(true));
        acceptor_.bind(endpoint);
        acceptor_.listen();
    }

    void startAccepting()
    {
        acceptor_.async_accept(
                [this](const asio::error_code &errorCode,
                       asio::ip::tcp::socket peer)
                {
                    if (!errorCode)
                    {
                        startAccepting();
//                        std::cout << "Connection accepted\n";
                    }
                    if (errorCode == asio::error::operation_aborted)
                    {
//                        std::cout << "Stopped accepting connections\n";
                        return;
                    }
                });
    }

    void startRejecting()
    {
        // TODO: how to reject?
    }

    void stop()
    {
        // asio::post(acceptor_.get_executor(), [this](){acceptor_.cancel();}); // this line fixes deadlock
        acceptor_.cancel();
        // acceptor_.close(); // this line also fixes deadlock
    }

private:
    asio::ip::tcp::acceptor acceptor_;
};

int main()
{
    setvbuf(stdout, NULL, _IONBF, 0);
    asio::io_context context;

    // run server
    auto serverStrand = asio::make_strand(context);
    server server{serverStrand};
    server.startAccepting();

    // run client
    auto clientStrand = asio::make_strand(context);
    asio::ip::tcp::socket socket{clientStrand};

    size_t attempts = 1;
    auto endpoint = asio::ip::tcp::endpoint(
            asio::ip::make_address_v4(localhost), port);

    std::future<void> res = socket.async_connect(endpoint, asio::use_future);

    std::future<void> runningContexts[] = {
            std::async(std::launch::async, runContext, std::ref(context)),
            std::async(std::launch::async, runContext, std::ref(context))
    };

    res.get();
    server.stop();
    std::cout << "Server has been requested to stop" << std::endl;

    return 0;
}

更新
根据 sehe 的回答,我陷入了僵局,因为当调用 server.stop() 时,成功接受的完成处理程序已经 posted 但是由于从未调用取消,这导致上下文有待处理工作,因此最后陷入僵局(如果我理解正确的话)。 我仍然不明白的是:

  1. 服务器有一个单独的链(根据规范)强制非并发地并以 FIFO 顺序调用接受器的命令。没有提供执行程序的处理程序也必须在同一线程中处理。文档中没有关于 acceptor::cancel() 方法的线程安全性的内容,尽管不同的 acceptor 对象是 安全的 。所以我假设它是线程安全的(一个 strand 内不可能发生数据竞争)。 如果 cancel 通过 asio::post 显式 post 进入 acceptor 的线程,@sehe 的代码不会导致死锁。对于 500 次调用,没有死锁:
test 499
Awaiting client
New thread 3
New thread 2
Completed client
Server stopped
Accept: 127.0.0.1:14475
Accept: The I/O operation has been aborted because of either a thread exit or an application request.
Stopping thread: 2
Stopping thread: 3
Everyting shutdown

但是,如果我在同步之前删除打印代码并 stop() 导致延迟,则很容易出现死锁:

PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> for ($i = 0; $i -lt 500; $i++){
>> Write-Output "
>> test $i"
>> .\sb.sf_example.exe}

test 0
New thread 2
New thread 3
Server stopped
Accept: 127.0.0.1:15160
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin>
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> for ($i = 0; $i -lt 500; $i++){
>> Write-Output "
>> test $i"
>> .\sb.sf_example.exe}

test 0
New thread 2New thread 3

Server stopped
Accept: 127.0.0.1:15174
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> ^C

所以,结论是无论怎么调用acceptor.cancel(),都会死锁。

  1. 有没有办法避免 acceptor 死锁?

我在代码上做了一些抽脂并添加了一些跟踪:

Live On Wandbox

#include <boost/asio.hpp>
#include <iostream>
#include <functional>

constexpr unsigned short port = 12000;
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;

void runContext(asio::io_context& io_context) {
    std::cout << "New thread " << std::this_thread::get_id() << std::endl;
    io_context.run();
    std::cout << "Stopping thread: " << std::this_thread::get_id() << std::endl;
}

class server {
  public:
    template <typename Executor>
    explicit server(Executor executor) : acceptor_(executor, {{}, port}) {
        acceptor_.set_option(tcp::acceptor::reuse_address(true));
    }

    void startAccepting() {
        acceptor_.listen();
        acceptLoop();
    }

    void stop()
    {
        //asio::post(acceptor_.get_executor(), [this]() {
            //acceptor_.cancel();
        //}); // this line fixes deadlock
        acceptor_.cancel();
        // acceptor_.close(); // this line also fixes deadlock
    }

  private:
    void acceptLoop() {
        acceptor_.async_accept([this](error_code errorCode, tcp::socket peer) {
            if (!errorCode) {
                std::cout << "Accept: " << peer.remote_endpoint() << std::endl;
                acceptLoop();
            } else {
                std::cout << "Accept: " << errorCode.message() << std::endl;
            }
        });
    }

    tcp::acceptor acceptor_;
};

int main() {
    setvbuf(stdout, NULL, _IONBF, 0);
    asio::io_context context;

    // run server
    server server{make_strand(context)};
    server.startAccepting();

    // run client
    tcp::socket socket{make_strand(context)};
    std::future<void> res = socket.async_connect({ {}, port}, asio::use_future);

    std::thread t1(runContext, std::ref(context));
    std::thread t2(runContext, std::ref(context));

    std::cout << "Awaiting client " << std::endl;

    res.get();

    std::cout << "Completed client" << std::endl;

    server.stop();

    std::cout << "Server stopped" << std::endl;

    t1.join();
    t2.join();
    std::cout << "Everyting shutdown" << std::endl;
}

如您所见,“正确的”运行 输出:

Awaiting client 
New thread 140712013797120
Accept: New thread 140712005404416
127.0.0.1:57500
Completed client
Server stopped
Accept: Operation canceled
Stopping thread: 140712013797120
Stopping thread: 140712005404416
Everyting shutdown

然而,一个“不正确的运行”打印:

New thread 140544269350656
Awaiting client 
New thread 140544260957952
Completed client
Server stopped
Accept: 127.0.0.1:48580
^C

关键在这里:

Server stopped
Accept: 127.0.0.1:48580

取消在接受之前。这意味着有一场比赛,其中 async_accept 的完成处理程序已经在非失败 errorCode 的飞行中。 (换句话说,async_connect 返回的时间比服务器能够处理其 async_accept 完成的时间早一点。)

确实,post挂在绳子上是修复它的一种方法。这是因为任何正在运行的处理程序都将在取消之前 运行,如果异步操作挂起,它将被取消。

注意:acceptor_.close() 的另一种方法会调用未定义的行为,因为 acceptor_ 本身存在数据竞争(这不是线程安全的)。


烦恼

旁白:

一个“问题”是std::launch::async。我不使用它。我认为它的行为是实现定义的,以至于它不是很有用。也许,改用 std::thread ,因为这就是你所追求的,在这里。在最近的提升中,使用 asio::thread_pool(2).

这里的答案说明了一些问题 Why should I use std::async?

已编辑问题的更新

你说得对,它具有相同的竞争条件 - 尽管没有 UB,所以很好。

Sidenote: You should probably stop calling it "deadlock" because there is none. It's a softlock, you're just waiting for something that never happens (async_connect). Deadlock is when multiple parties contend for locks in a way that can never be satisfied. This is just a soft-lock in the sense that a connection or even a network failure will allow the system to proceed.

所以我也删除了输出但添加了 BOOST_ASIO_ENABLE_HANDLER_TRACKING。生成的图片证实了上面的确切解释:

从这里开始,唯一明显的解决方案似乎是:

  1. 不要从客户端链中取消服务器。通过扩展它意味着

    • 对客户端和服务器使用相同的链(在进程间通信中不可行)
    • 信号关闭 一个连接中,因此消息无论如何都会在服务器链 上收到。这是“使关闭命令成为协议的一部分”的方式。
  2. post 一个 close 而不是 cancel,这会主动使任何 async_accept 成为错误

  3. 或者,在服务器中有一个状态机,在启动新的 async_accept

    之前手动检查我们是否应该仍然接受

注意

  • 与使用本机句柄的代码中的 .close() 方法一起使用可能导致其自身的竞争条件(其中另一个线程立即打开一个新的 file/socket 并重用filedescriptor 和本机代码没有注意到它正在与错误的套接字通话)。老实说,这似乎主要是流套接字(不是接受器)的问题,并且很容易用 .shutdown() 修复,所以请注意。

  • 我在 ASIO 的大量生产使用中从未遇到过这个问题。我想,在实践中,您的确切用例(.cancel() 与新的 accept 完成时间完美地结合在一起)并没有出现很多

  • 确实经常出现的一个类似用例是定时器,它也很难处理无竞争。在那里,消除歧义的因素是 额外状态,形式为 basic_waitable_timer::expiry()。参见例如