ASIO:使用多条线和一条线时出现死锁 io_context
ASIO: getting deadlock when using several strands and threads with one io_context
我正在 Windows 最新版本的 ASIO 上使用 Mingw64 进行编译。
我有一个用于接受 tcp 连接的沙箱代码。我使用一个上下文,每个接受器一个链,一个套接字和 2 个线程(我在文档中读到 post 进入两个不同的链并不能保证并发调用)。出于某种原因,我在执行结束时陷入僵局,我不知道为什么会这样。如果出现以下情况则不会发生:
- 我使用 1 个线程和一个公共上下文
- 有时 当我使用 1 个上下文和 2 个没有链的线程时
- 我使用 2 个不同的上下文和 2 个没有链的不同线程
- 当
std::future
同步和停止服务器的请求之间经过一段时间时
- 有时 当我 post
acceptor.cancel()
明确地向其执行者
如果我 close
acceptor 也不会发生死锁。
我未能在文档中找到任何可能解释此类行为原因的相关信息。而且我不想忽略它,因为它可能会导致不可预测的问题。
这是我的沙箱代码:
#include <asio.hpp>
#include <iostream>
#include <sstream>
#include <functional>
constexpr const char localhost[] = "127.0.0.1";
constexpr unsigned short port = 12000;
void runContext(asio::io_context &io_context)
{
std::string threadId{};
std::stringstream ss;
ss << std::this_thread::get_id();
ss >> threadId;
std::cout << std::string("New thread for asio context: ")
+ threadId + "\n";
std::cout.flush();
io_context.run();
std::cout << std::string("Stopping thread: ")
+ threadId + "\n";
std::cout.flush();
};
class server
{
public:
template<typename Executor>
explicit server(Executor &executor)
: acceptor_(executor)
{
using asio::ip::tcp;
auto endpoint = tcp::endpoint(asio::ip::make_address_v4(localhost),
port);
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
}
void startAccepting()
{
acceptor_.async_accept(
[this](const asio::error_code &errorCode,
asio::ip::tcp::socket peer)
{
if (!errorCode)
{
startAccepting();
// std::cout << "Connection accepted\n";
}
if (errorCode == asio::error::operation_aborted)
{
// std::cout << "Stopped accepting connections\n";
return;
}
});
}
void startRejecting()
{
// TODO: how to reject?
}
void stop()
{
// asio::post(acceptor_.get_executor(), [this](){acceptor_.cancel();}); // this line fixes deadlock
acceptor_.cancel();
// acceptor_.close(); // this line also fixes deadlock
}
private:
asio::ip::tcp::acceptor acceptor_;
};
int main()
{
setvbuf(stdout, NULL, _IONBF, 0);
asio::io_context context;
// run server
auto serverStrand = asio::make_strand(context);
server server{serverStrand};
server.startAccepting();
// run client
auto clientStrand = asio::make_strand(context);
asio::ip::tcp::socket socket{clientStrand};
size_t attempts = 1;
auto endpoint = asio::ip::tcp::endpoint(
asio::ip::make_address_v4(localhost), port);
std::future<void> res = socket.async_connect(endpoint, asio::use_future);
std::future<void> runningContexts[] = {
std::async(std::launch::async, runContext, std::ref(context)),
std::async(std::launch::async, runContext, std::ref(context))
};
res.get();
server.stop();
std::cout << "Server has been requested to stop" << std::endl;
return 0;
}
更新
根据 sehe 的回答,我陷入了僵局,因为当调用 server.stop()
时,成功接受的完成处理程序已经 posted 但是由于从未调用取消,这导致上下文有待处理工作,因此最后陷入僵局(如果我理解正确的话)。
我仍然不明白的是:
- 服务器有一个单独的链(根据规范)强制非并发地并以 FIFO 顺序调用接受器的命令。没有提供执行程序的处理程序也必须在同一线程中处理。文档中没有关于
acceptor::cancel()
方法的线程安全性的内容,尽管不同的 acceptor
对象是 安全的 。所以我假设它是线程安全的(一个 strand
内不可能发生数据竞争)。
如果 cancel
通过 asio::post
显式 post 进入 acceptor
的线程,@sehe 的代码不会导致死锁。对于 500 次调用,没有死锁:
test 499
Awaiting client
New thread 3
New thread 2
Completed client
Server stopped
Accept: 127.0.0.1:14475
Accept: The I/O operation has been aborted because of either a thread exit or an application request.
Stopping thread: 2
Stopping thread: 3
Everyting shutdown
但是,如果我在同步之前删除打印代码并 stop()
导致延迟,则很容易出现死锁:
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> for ($i = 0; $i -lt 500; $i++){
>> Write-Output "
>> test $i"
>> .\sb.sf_example.exe}
test 0
New thread 2
New thread 3
Server stopped
Accept: 127.0.0.1:15160
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin>
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> for ($i = 0; $i -lt 500; $i++){
>> Write-Output "
>> test $i"
>> .\sb.sf_example.exe}
test 0
New thread 2New thread 3
Server stopped
Accept: 127.0.0.1:15174
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> ^C
所以,结论是无论怎么调用acceptor.cancel()
,都会死锁。
- 有没有办法避免 acceptor 死锁?
我在代码上做了一些抽脂并添加了一些跟踪:
#include <boost/asio.hpp>
#include <iostream>
#include <functional>
constexpr unsigned short port = 12000;
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
void runContext(asio::io_context& io_context) {
std::cout << "New thread " << std::this_thread::get_id() << std::endl;
io_context.run();
std::cout << "Stopping thread: " << std::this_thread::get_id() << std::endl;
}
class server {
public:
template <typename Executor>
explicit server(Executor executor) : acceptor_(executor, {{}, port}) {
acceptor_.set_option(tcp::acceptor::reuse_address(true));
}
void startAccepting() {
acceptor_.listen();
acceptLoop();
}
void stop()
{
//asio::post(acceptor_.get_executor(), [this]() {
//acceptor_.cancel();
//}); // this line fixes deadlock
acceptor_.cancel();
// acceptor_.close(); // this line also fixes deadlock
}
private:
void acceptLoop() {
acceptor_.async_accept([this](error_code errorCode, tcp::socket peer) {
if (!errorCode) {
std::cout << "Accept: " << peer.remote_endpoint() << std::endl;
acceptLoop();
} else {
std::cout << "Accept: " << errorCode.message() << std::endl;
}
});
}
tcp::acceptor acceptor_;
};
int main() {
setvbuf(stdout, NULL, _IONBF, 0);
asio::io_context context;
// run server
server server{make_strand(context)};
server.startAccepting();
// run client
tcp::socket socket{make_strand(context)};
std::future<void> res = socket.async_connect({ {}, port}, asio::use_future);
std::thread t1(runContext, std::ref(context));
std::thread t2(runContext, std::ref(context));
std::cout << "Awaiting client " << std::endl;
res.get();
std::cout << "Completed client" << std::endl;
server.stop();
std::cout << "Server stopped" << std::endl;
t1.join();
t2.join();
std::cout << "Everyting shutdown" << std::endl;
}
如您所见,“正确的”运行 输出:
Awaiting client
New thread 140712013797120
Accept: New thread 140712005404416
127.0.0.1:57500
Completed client
Server stopped
Accept: Operation canceled
Stopping thread: 140712013797120
Stopping thread: 140712005404416
Everyting shutdown
然而,一个“不正确的运行”打印:
New thread 140544269350656
Awaiting client
New thread 140544260957952
Completed client
Server stopped
Accept: 127.0.0.1:48580
^C
关键在这里:
Server stopped
Accept: 127.0.0.1:48580
取消在接受之前。这意味着有一场比赛,其中 async_accept
的完成处理程序已经在非失败 errorCode
的飞行中。 (换句话说,async_connect
返回的时间比服务器能够处理其 async_accept
完成的时间早一点。)
确实,post挂在绳子上是修复它的一种方法。这是因为任何正在运行的处理程序都将在取消之前 运行,如果异步操作挂起,它将被取消。
注意:acceptor_.close()
的另一种方法会调用未定义的行为,因为 acceptor_
本身存在数据竞争(这不是线程安全的)。
烦恼
旁白:
一个“问题”是std::launch::async
。我不使用它。我认为它的行为是实现定义的,以至于它不是很有用。也许,改用 std::thread
,因为这就是你所追求的,在这里。在最近的提升中,使用 asio::thread_pool(2)
.
这里的答案说明了一些问题 Why should I use std::async?
已编辑问题的更新
你说得对,它具有相同的竞争条件 - 尽管没有 UB,所以很好。
Sidenote: You should probably stop calling it "deadlock" because there is none. It's a softlock, you're just waiting for something that never happens (async_connect
). Deadlock is when multiple parties contend for locks in a way that can never be satisfied. This is just a soft-lock in the sense that a connection or even a network failure will allow the system to proceed.
所以我也删除了输出但添加了 BOOST_ASIO_ENABLE_HANDLER_TRACKING。生成的图片证实了上面的确切解释:
从这里开始,唯一明显的解决方案似乎是:
不要从客户端链中取消服务器。通过扩展它意味着
- 对客户端和服务器使用相同的链(在进程间通信中不可行)
- 信号关闭在 一个连接中,因此消息无论如何都会在服务器链 上收到。这是“使关闭命令成为协议的一部分”的方式。
post 一个 close
而不是 cancel
,这会主动使任何 async_accept
成为错误
或者,在服务器中有一个状态机,在启动新的 async_accept
之前手动检查我们是否应该仍然接受
注意那
与使用本机句柄的代码中的 .close()
方法一起使用可能导致其自身的竞争条件(其中另一个线程立即打开一个新的 file/socket 并重用filedescriptor 和本机代码没有注意到它正在与错误的套接字通话)。老实说,这似乎主要是流套接字(不是接受器)的问题,并且很容易用 .shutdown()
修复,所以请注意。
我在 ASIO 的大量生产使用中从未遇到过这个问题。我想,在实践中,您的确切用例(.cancel()
与新的 accept
完成时间完美地结合在一起)并没有出现很多
确实经常出现的一个类似用例是定时器,它也很难处理无竞争。在那里,消除歧义的因素是 也 额外状态,形式为 basic_waitable_timer::expiry()
。参见例如
我正在 Windows 最新版本的 ASIO 上使用 Mingw64 进行编译。
我有一个用于接受 tcp 连接的沙箱代码。我使用一个上下文,每个接受器一个链,一个套接字和 2 个线程(我在文档中读到 post 进入两个不同的链并不能保证并发调用)。出于某种原因,我在执行结束时陷入僵局,我不知道为什么会这样。如果出现以下情况则不会发生:
- 我使用 1 个线程和一个公共上下文
- 有时 当我使用 1 个上下文和 2 个没有链的线程时
- 我使用 2 个不同的上下文和 2 个没有链的不同线程
- 当
std::future
同步和停止服务器的请求之间经过一段时间时 - 有时 当我 post
acceptor.cancel()
明确地向其执行者
如果我 close
acceptor 也不会发生死锁。
我未能在文档中找到任何可能解释此类行为原因的相关信息。而且我不想忽略它,因为它可能会导致不可预测的问题。
这是我的沙箱代码:
#include <asio.hpp>
#include <iostream>
#include <sstream>
#include <functional>
constexpr const char localhost[] = "127.0.0.1";
constexpr unsigned short port = 12000;
void runContext(asio::io_context &io_context)
{
std::string threadId{};
std::stringstream ss;
ss << std::this_thread::get_id();
ss >> threadId;
std::cout << std::string("New thread for asio context: ")
+ threadId + "\n";
std::cout.flush();
io_context.run();
std::cout << std::string("Stopping thread: ")
+ threadId + "\n";
std::cout.flush();
};
class server
{
public:
template<typename Executor>
explicit server(Executor &executor)
: acceptor_(executor)
{
using asio::ip::tcp;
auto endpoint = tcp::endpoint(asio::ip::make_address_v4(localhost),
port);
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
}
void startAccepting()
{
acceptor_.async_accept(
[this](const asio::error_code &errorCode,
asio::ip::tcp::socket peer)
{
if (!errorCode)
{
startAccepting();
// std::cout << "Connection accepted\n";
}
if (errorCode == asio::error::operation_aborted)
{
// std::cout << "Stopped accepting connections\n";
return;
}
});
}
void startRejecting()
{
// TODO: how to reject?
}
void stop()
{
// asio::post(acceptor_.get_executor(), [this](){acceptor_.cancel();}); // this line fixes deadlock
acceptor_.cancel();
// acceptor_.close(); // this line also fixes deadlock
}
private:
asio::ip::tcp::acceptor acceptor_;
};
int main()
{
setvbuf(stdout, NULL, _IONBF, 0);
asio::io_context context;
// run server
auto serverStrand = asio::make_strand(context);
server server{serverStrand};
server.startAccepting();
// run client
auto clientStrand = asio::make_strand(context);
asio::ip::tcp::socket socket{clientStrand};
size_t attempts = 1;
auto endpoint = asio::ip::tcp::endpoint(
asio::ip::make_address_v4(localhost), port);
std::future<void> res = socket.async_connect(endpoint, asio::use_future);
std::future<void> runningContexts[] = {
std::async(std::launch::async, runContext, std::ref(context)),
std::async(std::launch::async, runContext, std::ref(context))
};
res.get();
server.stop();
std::cout << "Server has been requested to stop" << std::endl;
return 0;
}
更新
根据 sehe 的回答,我陷入了僵局,因为当调用 server.stop()
时,成功接受的完成处理程序已经 posted 但是由于从未调用取消,这导致上下文有待处理工作,因此最后陷入僵局(如果我理解正确的话)。
我仍然不明白的是:
- 服务器有一个单独的链(根据规范)强制非并发地并以 FIFO 顺序调用接受器的命令。没有提供执行程序的处理程序也必须在同一线程中处理。文档中没有关于
acceptor::cancel()
方法的线程安全性的内容,尽管不同的acceptor
对象是 安全的 。所以我假设它是线程安全的(一个strand
内不可能发生数据竞争)。 如果cancel
通过asio::post
显式 post 进入acceptor
的线程,@sehe 的代码不会导致死锁。对于 500 次调用,没有死锁:
test 499
Awaiting client
New thread 3
New thread 2
Completed client
Server stopped
Accept: 127.0.0.1:14475
Accept: The I/O operation has been aborted because of either a thread exit or an application request.
Stopping thread: 2
Stopping thread: 3
Everyting shutdown
但是,如果我在同步之前删除打印代码并 stop()
导致延迟,则很容易出现死锁:
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> for ($i = 0; $i -lt 500; $i++){
>> Write-Output "
>> test $i"
>> .\sb.sf_example.exe}
test 0
New thread 2
New thread 3
Server stopped
Accept: 127.0.0.1:15160
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin>
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> for ($i = 0; $i -lt 500; $i++){
>> Write-Output "
>> test $i"
>> .\sb.sf_example.exe}
test 0
New thread 2New thread 3
Server stopped
Accept: 127.0.0.1:15174
PS C:\dev\builds\asio_connection_logic\Release-MinGW-w64\bin> ^C
所以,结论是无论怎么调用acceptor.cancel()
,都会死锁。
- 有没有办法避免 acceptor 死锁?
我在代码上做了一些抽脂并添加了一些跟踪:
#include <boost/asio.hpp>
#include <iostream>
#include <functional>
constexpr unsigned short port = 12000;
namespace asio = boost::asio;
using boost::system::error_code;
using asio::ip::tcp;
void runContext(asio::io_context& io_context) {
std::cout << "New thread " << std::this_thread::get_id() << std::endl;
io_context.run();
std::cout << "Stopping thread: " << std::this_thread::get_id() << std::endl;
}
class server {
public:
template <typename Executor>
explicit server(Executor executor) : acceptor_(executor, {{}, port}) {
acceptor_.set_option(tcp::acceptor::reuse_address(true));
}
void startAccepting() {
acceptor_.listen();
acceptLoop();
}
void stop()
{
//asio::post(acceptor_.get_executor(), [this]() {
//acceptor_.cancel();
//}); // this line fixes deadlock
acceptor_.cancel();
// acceptor_.close(); // this line also fixes deadlock
}
private:
void acceptLoop() {
acceptor_.async_accept([this](error_code errorCode, tcp::socket peer) {
if (!errorCode) {
std::cout << "Accept: " << peer.remote_endpoint() << std::endl;
acceptLoop();
} else {
std::cout << "Accept: " << errorCode.message() << std::endl;
}
});
}
tcp::acceptor acceptor_;
};
int main() {
setvbuf(stdout, NULL, _IONBF, 0);
asio::io_context context;
// run server
server server{make_strand(context)};
server.startAccepting();
// run client
tcp::socket socket{make_strand(context)};
std::future<void> res = socket.async_connect({ {}, port}, asio::use_future);
std::thread t1(runContext, std::ref(context));
std::thread t2(runContext, std::ref(context));
std::cout << "Awaiting client " << std::endl;
res.get();
std::cout << "Completed client" << std::endl;
server.stop();
std::cout << "Server stopped" << std::endl;
t1.join();
t2.join();
std::cout << "Everyting shutdown" << std::endl;
}
如您所见,“正确的”运行 输出:
Awaiting client
New thread 140712013797120
Accept: New thread 140712005404416
127.0.0.1:57500
Completed client
Server stopped
Accept: Operation canceled
Stopping thread: 140712013797120
Stopping thread: 140712005404416
Everyting shutdown
然而,一个“不正确的运行”打印:
New thread 140544269350656
Awaiting client
New thread 140544260957952
Completed client
Server stopped
Accept: 127.0.0.1:48580
^C
关键在这里:
Server stopped
Accept: 127.0.0.1:48580
取消在接受之前。这意味着有一场比赛,其中 async_accept
的完成处理程序已经在非失败 errorCode
的飞行中。 (换句话说,async_connect
返回的时间比服务器能够处理其 async_accept
完成的时间早一点。)
确实,post挂在绳子上是修复它的一种方法。这是因为任何正在运行的处理程序都将在取消之前 运行,如果异步操作挂起,它将被取消。
注意:acceptor_.close()
的另一种方法会调用未定义的行为,因为 acceptor_
本身存在数据竞争(这不是线程安全的)。
烦恼
旁白:
一个“问题”是std::launch::async
。我不使用它。我认为它的行为是实现定义的,以至于它不是很有用。也许,改用 std::thread
,因为这就是你所追求的,在这里。在最近的提升中,使用 asio::thread_pool(2)
.
这里的答案说明了一些问题 Why should I use std::async?
已编辑问题的更新
你说得对,它具有相同的竞争条件 - 尽管没有 UB,所以很好。
Sidenote: You should probably stop calling it "deadlock" because there is none. It's a softlock, you're just waiting for something that never happens (
async_connect
). Deadlock is when multiple parties contend for locks in a way that can never be satisfied. This is just a soft-lock in the sense that a connection or even a network failure will allow the system to proceed.
所以我也删除了输出但添加了 BOOST_ASIO_ENABLE_HANDLER_TRACKING。生成的图片证实了上面的确切解释:
从这里开始,唯一明显的解决方案似乎是:
不要从客户端链中取消服务器。通过扩展它意味着
- 对客户端和服务器使用相同的链(在进程间通信中不可行)
- 信号关闭在 一个连接中,因此消息无论如何都会在服务器链 上收到。这是“使关闭命令成为协议的一部分”的方式。
post 一个
close
而不是cancel
,这会主动使任何async_accept
成为错误或者,在服务器中有一个状态机,在启动新的
之前手动检查我们是否应该仍然接受async_accept
注意那
与使用本机句柄的代码中的
.close()
方法一起使用可能导致其自身的竞争条件(其中另一个线程立即打开一个新的 file/socket 并重用filedescriptor 和本机代码没有注意到它正在与错误的套接字通话)。老实说,这似乎主要是流套接字(不是接受器)的问题,并且很容易用.shutdown()
修复,所以请注意。我在 ASIO 的大量生产使用中从未遇到过这个问题。我想,在实践中,您的确切用例(
.cancel()
与新的accept
完成时间完美地结合在一起)并没有出现很多确实经常出现的一个类似用例是定时器,它也很难处理无竞争。在那里,消除歧义的因素是 也 额外状态,形式为
basic_waitable_timer::expiry()
。参见例如