Boost::beast:多个 async_write 调用触发断言错误
Boost::beast: Multiple async_write calls are firing an assertion error
我正在为我的全双工服务器编写测试,当我进行多次(顺序)async_write
调用时(尽管被一条链覆盖),我从 boost::beast
在文件中 boost/beast/websocket/detail/stream_base.hpp
:
// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);
要在您的计算机上重现该问题: 可以找到重现该问题的完整客户端代码 (MCVE) here. It doesn't work in the link because you need a server (on your own machine, sorry as it's not possible to do this conveniently online, and this is better objectively to show that the problem is in the client, not in the server if I include it here). I used websocketd to create a server with the command ./websocketd --ssl --sslkey /path/to/server.key --sslcert /path/to/server.crt --port=8085 ./prog.py
where ./prog.py
is a simply python program that prints and flushes (I got it from websocketd home page)。
在客户端中写入的调用如下所示:
std::vector<std::vector<std::future<void>>> clients_write_futures(
clients_count);
for (int i = 0; i < clients_count; i++) {
clients_write_futures[i] = std::vector<std::future<void>>(num_of_messages);
for (int j = 0; j < num_of_messages; j++) {
clients_write_futures[i][j] =
clients[i]->write_data_async_future("Hello"); // writing here
}
}
请注意,我在示例中仅使用了 1 个客户端。客户端数组只是在测试时对服务器施加更多压力的概括。
我对问题的评论:
- 循环是顺序的;这不像我在多个线程中这样做
- 应该可以以全双工形式进行通信,其中不定数量的消息被发送到服务器。全双工通信还能如何完成?
- 我正在使用链来包装我的异步调用,以防止通过 io_service/io_context
在套接字中发生任何冲突
- 用调试器对此进行调查表明,循环的第二次迭代始终失败,这意味着我做的事情从根本上是错误的,但我不知道它是什么。换句话说:这显然是一个确定性问题。
我在这里做错了什么?如何向我的 websocket 服务器写入无限数量的消息?
编辑:
Sehe,我想首先为代码混乱道歉(没有意识到它如此糟糕),并感谢你为此付出的努力。我希望你问我为什么它同时以这种(可能)有组织和混乱的方式构建,答案很简单:主要是一个 gtest 代码,用于查看我的通用、多功能 websocket 客户端是否工作,我用来强调 -测试我的服务器(它使用大量多线程 io_service 对象,我认为这些对象很敏感并且需要广泛测试)。我计划在实际生产测试期间同时用许多客户端轰炸我的服务器。我发布这个问题是因为我不了解客户的行为。我在这个文件中所做的是创建一个 MCVE(人们一直要求在 SO 上)。我花了两个小时来剥离我的代码来创建它,最后我复制了我的 gtest 测试夹具代码(这是服务器上的夹具)并将其粘贴到 main 中并验证问题仍然存在于另一台服务器上并清理干净一点点(这显然不够)。
那么为什么我不捕获异常?因为 gtest 会捕获它们并认为测试失败。主要不是生产代码,但客户端是。我从你提到的内容中学到了很多东西,我不得不说抛出和抓住是愚蠢的,但我不知道 std::make_exception_ptr(),所以我找到了我的(dumm)方法来实现相同的结果: -).为什么有太多无用的功能:它们在这里没有用test/example,但通常我以后可以将它们用于其他事情,因为这个客户端不仅适用于这种情况。
现在回到问题:我不明白为什么我们必须用 strand_ 覆盖 async_write
当它在主线程的循环中顺序使用时(我错误地表达了我仅涵盖处理程序)。我明白为什么要覆盖处理程序,因为套接字不是线程安全的,并且多线程 io_service
会在那里造成竞争。我们也知道 io_service::post
本身是线程安全的(这就是为什么我认为包装 async_write 是不必要的)。你能解释一下我们需要包装 async_write 本身的原因是什么不是线程安全的吗?我知道你已经知道了,但同样的断言仍在触发。我们对处理程序和异步队列进行了排序,但客户端仍然不喜欢进行多次写入调用。还可以缺少什么?
(顺便说一句,如果你写,然后得到未来,然后读,然后再写,它有效。这就是为什么我使用期货来准确定义测试用例并定义我的测试的时间顺序。我在这里有点偏执。)
你说你用一根线遮住了你的async_write
。但是你不会做这样的事情。您可以看到所做的只是 将完成处理程序包装在该链中 。但是您 post 直接 进行异步操作 。
更糟糕的是,您正在从主线程执行此操作,而与您的 WSClient
实例关联的任何线程上正在进行异步操作,这意味着您正在并发访问不是'线程安全。
这是一场数据竞赛,所以你得到 Undefined Behaviour。
一个天真的修复可能是:
std::future<void> write_data_async_future(const std::string &data) {
// shared_ptr is used to ensure data's survival
std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
post(strand_, [=,self=shared_from_this()] {
websock.async_write(
boost::asio::buffer(*data_ptr),
boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, self,
std::placeholders::_1, std::placeholders::_2, data_ptr,
write_promise)));
});
return write_promise->get_future();
}
但这还不够。现在您可以确定 none 的异步操作或其完成将同时 运行,但您仍然可以 post 完成处理程序之前的下一个异步操作首先被调用。
要解决这个问题,您只需要排队。
老实说,我不确定您为什么如此关注使用期货的同步。这只会让实现这一目标变得非常困难。如果您可以描述您/功能上/想要实现的目标,我可以提出一个可能更短的解决方案。
代码审查笔记
在我明白代码是什么之前,我花了很多时间阅读你的代码。我不想抢走我一路上做的笔记。
Warning: This was quite a protracted code dive. I provide it because some of the insights might help you see how you need to restructure your code.
我开始阅读异步代码链,直到 on_handshake
( 设置 started_promise
值)。
然后我进入了你的 main
功能的 malstrom。你的主要功能是50行代码?!有多个并行容器并通过它们重复手动嵌套循环?
这是我重构后得到的:
int main() {
std::vector<actor> actors(1);
for (auto& a : actors) {
a.client = std::make_shared<WSClient>();
a.session_start_future = a.client->start("127.0.0.1", "8085");
a.messages.resize(50);
}
for (auto& a : actors) { a.session_start_future.get(); }
for (auto& a : actors) { for (auto& m : a.messages) {
m.write_future = a.client->write_data_async_future("Hello");
} }
for (auto& a : actors) { for (auto& m : a.messages) {
m.read_future = a.client->read_data_async_future();
} }
for (auto& a : actors) { for (auto& m : a.messages) {
m.write_future.get();
std::string result = m.read_future.get();
} }
}
所有的数据结构都被折叠进了小帮手actor
:
struct actor {
std::shared_ptr<WSClient> client;
std::future<void> session_start_future;
struct message {
std::string message = GenerateRandomString(20);
std::future<void> write_future;
std::future<std::string> read_future;
};
std::vector<message> messages;
};
We're approximately one hour of code review down the road now, with no gain, except that we can now TELL what main
is doing, and have some confidence that there isn't some trivial mistake with a loop variable or something.
正在取回
写作开始时:write_data_async_future
。等待。还有 write_data_async
和 write_data_sync
。为什么?你会想要阅读
- How to set error_code to asio::yield_context
- 或
更糟糕的是,WSClient
仅将这些中继到假定的 单个 会话。为什么 WSClient
和 WSClientSession
在这一点上完全不同?我说,有none.
进一步蒸发 30 行不太有用的代码,我们仍然有同样的失败,所以很好。
我们在哪里。 write_data_async_future
。哦,是的,我们需要非未来版本吗?不,所以,还有 40 行代码消失了。
现在,真的:write_data_async_future
:
std::future<void> write_data_async_future(const std::string &data) {
// shared_ptr is used to ensure data's survival
std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
websock.async_write(
boost::asio::buffer(*data_ptr),
boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, data_ptr,
write_promise)));
return write_promise->get_future();
}
看起来...还可以。等等,有 on_write_future
?这可能意味着我们需要蒸发更多行未使用的代码。看着……是的。噗,走了
By now, the diffstat looks like this:
test.cpp | 683 +++++++++++++++++++++++----------------------------------------
1 file changed, 249 insertions(+), 434 deletions(-)
回到那个函数,让我们看看on_write_future
:
void on_write_future(boost::system::error_code ec, std::size_t bytes_transferred,
std::shared_ptr<std::string> data_posted,
std::shared_ptr<std::promise<void> > write_promise) {
boost::ignore_unused(bytes_transferred);
boost::ignore_unused(data_posted);
if (ec) {
try {
throw std::runtime_error("Error thrown while performing async write: " + ec.message());
} catch (...) {
write_promise->set_exception(std::current_exception());
}
return;
}
write_promise->set_value();
}
几个问题。通过的所有内容都将被忽略。我知道你传递 shared_ptrs 的目的是什么,但也许你应该将它们作为操作对象的一部分传递,以避免有这么多单独的共享指针。
抛出异常只是为了捕获它?嗯。对此我不确定。也许只是设置一个新的例外:
if (ec) {
write_promise->set_exception(
std::make_exception_ptr(std::system_error(ec, "async write failed")));
} else {
write_promise->set_value();
}
即便如此,现在还是存在概念上的问题。您自由使用 get()
而不会陷入 main
的方式意味着任何连接中的任何错误都只会中止所有操作。简单地中止 connection/session/client 错误将非常有用。其中,在您的代码中都是同义词(以及 io_context
和 thread
)。
旁注:您将线程存储为成员,但始终将其分离。那就是会员从此没用了
At this point I took a break from reviewing, and as it happens I got the brainwave that showed me the issue. The halfbaked result of my exercise are here. Note that you cannot use it because it doesn't actually fix the problem. But it might help in other ways?
我正在为我的全双工服务器编写测试,当我进行多次(顺序)async_write
调用时(尽管被一条链覆盖),我从 boost::beast
在文件中 boost/beast/websocket/detail/stream_base.hpp
:
// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);
要在您的计算机上重现该问题: 可以找到重现该问题的完整客户端代码 (MCVE) here. It doesn't work in the link because you need a server (on your own machine, sorry as it's not possible to do this conveniently online, and this is better objectively to show that the problem is in the client, not in the server if I include it here). I used websocketd to create a server with the command ./websocketd --ssl --sslkey /path/to/server.key --sslcert /path/to/server.crt --port=8085 ./prog.py
where ./prog.py
is a simply python program that prints and flushes (I got it from websocketd home page)。
在客户端中写入的调用如下所示:
std::vector<std::vector<std::future<void>>> clients_write_futures(
clients_count);
for (int i = 0; i < clients_count; i++) {
clients_write_futures[i] = std::vector<std::future<void>>(num_of_messages);
for (int j = 0; j < num_of_messages; j++) {
clients_write_futures[i][j] =
clients[i]->write_data_async_future("Hello"); // writing here
}
}
请注意,我在示例中仅使用了 1 个客户端。客户端数组只是在测试时对服务器施加更多压力的概括。
我对问题的评论:
- 循环是顺序的;这不像我在多个线程中这样做
- 应该可以以全双工形式进行通信,其中不定数量的消息被发送到服务器。全双工通信还能如何完成?
- 我正在使用链来包装我的异步调用,以防止通过 io_service/io_context 在套接字中发生任何冲突
- 用调试器对此进行调查表明,循环的第二次迭代始终失败,这意味着我做的事情从根本上是错误的,但我不知道它是什么。换句话说:这显然是一个确定性问题。
我在这里做错了什么?如何向我的 websocket 服务器写入无限数量的消息?
编辑:
Sehe,我想首先为代码混乱道歉(没有意识到它如此糟糕),并感谢你为此付出的努力。我希望你问我为什么它同时以这种(可能)有组织和混乱的方式构建,答案很简单:主要是一个 gtest 代码,用于查看我的通用、多功能 websocket 客户端是否工作,我用来强调 -测试我的服务器(它使用大量多线程 io_service 对象,我认为这些对象很敏感并且需要广泛测试)。我计划在实际生产测试期间同时用许多客户端轰炸我的服务器。我发布这个问题是因为我不了解客户的行为。我在这个文件中所做的是创建一个 MCVE(人们一直要求在 SO 上)。我花了两个小时来剥离我的代码来创建它,最后我复制了我的 gtest 测试夹具代码(这是服务器上的夹具)并将其粘贴到 main 中并验证问题仍然存在于另一台服务器上并清理干净一点点(这显然不够)。
那么为什么我不捕获异常?因为 gtest 会捕获它们并认为测试失败。主要不是生产代码,但客户端是。我从你提到的内容中学到了很多东西,我不得不说抛出和抓住是愚蠢的,但我不知道 std::make_exception_ptr(),所以我找到了我的(dumm)方法来实现相同的结果: -).为什么有太多无用的功能:它们在这里没有用test/example,但通常我以后可以将它们用于其他事情,因为这个客户端不仅适用于这种情况。
现在回到问题:我不明白为什么我们必须用 strand_ 覆盖 async_write
当它在主线程的循环中顺序使用时(我错误地表达了我仅涵盖处理程序)。我明白为什么要覆盖处理程序,因为套接字不是线程安全的,并且多线程 io_service
会在那里造成竞争。我们也知道 io_service::post
本身是线程安全的(这就是为什么我认为包装 async_write 是不必要的)。你能解释一下我们需要包装 async_write 本身的原因是什么不是线程安全的吗?我知道你已经知道了,但同样的断言仍在触发。我们对处理程序和异步队列进行了排序,但客户端仍然不喜欢进行多次写入调用。还可以缺少什么?
(顺便说一句,如果你写,然后得到未来,然后读,然后再写,它有效。这就是为什么我使用期货来准确定义测试用例并定义我的测试的时间顺序。我在这里有点偏执。)
你说你用一根线遮住了你的async_write
。但是你不会做这样的事情。您可以看到所做的只是 将完成处理程序包装在该链中 。但是您 post 直接 进行异步操作 。
更糟糕的是,您正在从主线程执行此操作,而与您的 WSClient
实例关联的任何线程上正在进行异步操作,这意味着您正在并发访问不是'线程安全。
这是一场数据竞赛,所以你得到 Undefined Behaviour。
一个天真的修复可能是:
std::future<void> write_data_async_future(const std::string &data) {
// shared_ptr is used to ensure data's survival
std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
post(strand_, [=,self=shared_from_this()] {
websock.async_write(
boost::asio::buffer(*data_ptr),
boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, self,
std::placeholders::_1, std::placeholders::_2, data_ptr,
write_promise)));
});
return write_promise->get_future();
}
但这还不够。现在您可以确定 none 的异步操作或其完成将同时 运行,但您仍然可以 post 完成处理程序之前的下一个异步操作首先被调用。
要解决这个问题,您只需要排队。
老实说,我不确定您为什么如此关注使用期货的同步。这只会让实现这一目标变得非常困难。如果您可以描述您/功能上/想要实现的目标,我可以提出一个可能更短的解决方案。
代码审查笔记
在我明白代码是什么之前,我花了很多时间阅读你的代码。我不想抢走我一路上做的笔记。
Warning: This was quite a protracted code dive. I provide it because some of the insights might help you see how you need to restructure your code.
我开始阅读异步代码链,直到 on_handshake
( 设置 started_promise
值)。
然后我进入了你的 main
功能的 malstrom。你的主要功能是50行代码?!有多个并行容器并通过它们重复手动嵌套循环?
这是我重构后得到的:
int main() {
std::vector<actor> actors(1);
for (auto& a : actors) {
a.client = std::make_shared<WSClient>();
a.session_start_future = a.client->start("127.0.0.1", "8085");
a.messages.resize(50);
}
for (auto& a : actors) { a.session_start_future.get(); }
for (auto& a : actors) { for (auto& m : a.messages) {
m.write_future = a.client->write_data_async_future("Hello");
} }
for (auto& a : actors) { for (auto& m : a.messages) {
m.read_future = a.client->read_data_async_future();
} }
for (auto& a : actors) { for (auto& m : a.messages) {
m.write_future.get();
std::string result = m.read_future.get();
} }
}
所有的数据结构都被折叠进了小帮手actor
:
struct actor {
std::shared_ptr<WSClient> client;
std::future<void> session_start_future;
struct message {
std::string message = GenerateRandomString(20);
std::future<void> write_future;
std::future<std::string> read_future;
};
std::vector<message> messages;
};
We're approximately one hour of code review down the road now, with no gain, except that we can now TELL what
main
is doing, and have some confidence that there isn't some trivial mistake with a loop variable or something.
正在取回
写作开始时:write_data_async_future
。等待。还有 write_data_async
和 write_data_sync
。为什么?你会想要阅读
- How to set error_code to asio::yield_context
- 或
更糟糕的是,WSClient
仅将这些中继到假定的 单个 会话。为什么 WSClient
和 WSClientSession
在这一点上完全不同?我说,有none.
进一步蒸发 30 行不太有用的代码,我们仍然有同样的失败,所以很好。
我们在哪里。 write_data_async_future
。哦,是的,我们需要非未来版本吗?不,所以,还有 40 行代码消失了。
现在,真的:write_data_async_future
:
std::future<void> write_data_async_future(const std::string &data) {
// shared_ptr is used to ensure data's survival
std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
websock.async_write(
boost::asio::buffer(*data_ptr),
boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, data_ptr,
write_promise)));
return write_promise->get_future();
}
看起来...还可以。等等,有 on_write_future
?这可能意味着我们需要蒸发更多行未使用的代码。看着……是的。噗,走了
By now, the diffstat looks like this:
test.cpp | 683 +++++++++++++++++++++++---------------------------------------- 1 file changed, 249 insertions(+), 434 deletions(-)
回到那个函数,让我们看看on_write_future
:
void on_write_future(boost::system::error_code ec, std::size_t bytes_transferred,
std::shared_ptr<std::string> data_posted,
std::shared_ptr<std::promise<void> > write_promise) {
boost::ignore_unused(bytes_transferred);
boost::ignore_unused(data_posted);
if (ec) {
try {
throw std::runtime_error("Error thrown while performing async write: " + ec.message());
} catch (...) {
write_promise->set_exception(std::current_exception());
}
return;
}
write_promise->set_value();
}
几个问题。通过的所有内容都将被忽略。我知道你传递 shared_ptrs 的目的是什么,但也许你应该将它们作为操作对象的一部分传递,以避免有这么多单独的共享指针。
抛出异常只是为了捕获它?嗯。对此我不确定。也许只是设置一个新的例外:
if (ec) {
write_promise->set_exception(
std::make_exception_ptr(std::system_error(ec, "async write failed")));
} else {
write_promise->set_value();
}
即便如此,现在还是存在概念上的问题。您自由使用 get()
而不会陷入 main
的方式意味着任何连接中的任何错误都只会中止所有操作。简单地中止 connection/session/client 错误将非常有用。其中,在您的代码中都是同义词(以及 io_context
和 thread
)。
旁注:您将线程存储为成员,但始终将其分离。那就是会员从此没用了
At this point I took a break from reviewing, and as it happens I got the brainwave that showed me the issue. The halfbaked result of my exercise are here. Note that you cannot use it because it doesn't actually fix the problem. But it might help in other ways?