boost::async读一写卡住
boost::async read write stuck after one read and two writes
我试图构建一个服务器-客户端应用程序,服务器端是带有 boost::asio 异步的 c++,但是当我是 运行 应用程序时,它在 1 次读取和两次写入后卡住了。
这是我删除 do_write
函数调用时的相关代码 我收到了我发送的所有 5 条消息
编辑
当我 运行 调试器时,我能够接收所有五个消息并发送所有响应
void start()
{
std::thread t1([this](){do_write();});
t1.detach();
std::thread t2([this](){do_read();});
t2.detach();
}
void do_write()
{
auto self(shared_from_this());
Message msg;
while(order_response_queue_.empty())
{}
auto order_response = order_response_queue_.front();
order_response_queue_.pop();
try {
auto m = order_response.SerializeToString();
msg.body_length(std::strlen(m.c_str()));
std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
msg.encode_header();
std::memcpy(data_,msg.data(), msg.length());
//std::memcpy(static_cast<void *>(msg.data()), data_, msg.length());
} catch (std::exception& e )
{
std::cout << e.what();
}
std::cout <<"write: " << msg.body() << "\n";
boost::asio::async_write(socket_, boost::asio::buffer(data_,msg.length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (ec){
std::cerr << "write error:" << ec.value() << " message: " << ec.message() << "\n";
}
do_write();
});
}
void do_read()
{
auto self(shared_from_this());
Message msg;
socket_.async_read_some(boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec && res.decode_header()) {
std::string st(res.body());
boost::asio::async_read(socket_, boost::asio::buffer(res.body(), res.body_length()), [this](boost::system::error_code ec, std::size_t length) {
if (!ec) {
std::cout << "read " << res.body() << "\n";
req_.DeserializeFromChar(res.body());
order_request_queue_.push(req_);
} else {
if (ec) {
std::cerr << "read error:" << ec.value() << " message: "
<< ec.message() << "\n";
}
socket_.close();
}
});
}
do_read();
});
}
这里是 io_service
class Server
{
public:
Server(boost::asio::io_service& io_service, short port,std::queue<OrderRequest> &order_request_queue,
std::queue<OrderResponse> &order_response_queue)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service) , order_response_queue_(order_response_queue), order_request_queue_(order_request_queue)
{
do_accept();
}
private:
void do_accept(){
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec) {
if (!ec) {
std::cout << "accept connection\n";
std::make_shared<Session>(std::move(socket_),order_request_queue_, order_response_queue_)->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
std::queue<OrderResponse> &order_response_queue_;
std::queue<OrderRequest> &order_request_queue_;
};
你应该制作样本 self-contained。
- 什么是
Message
,
data_
从哪里来,
res
从哪里来
- 为什么
data_
与 msg.length()
, 一起使用
- 是否包括 header 长度,
req_
和res
有什么关系
- 这些的生命周期是多少。
- 为什么要对线程使用异步
- 执行上下文在哪里运行,等等
这是您给我们的评论:
- 您正在从线程中使用
order_response_queue_
,没有任何锁定。这真的不行,因为 do_write
线程上没有任何东西推送到那个 queue,所以其他线程必须这样做。
老实说,我认为最好的起点是删除线程。您似乎使用它们的主要原因是:
到“保持循环运行ning”,这已经是异步执行上下文的一个特性。但是,您已经从完成处理程序链接(例如再次调用 do_write
),所以没关系
能够“等待”queue 上的消息:
while (order_response_queue_.empty()) {
}
通常的做法是在推送第一条消息时有条件地启动循环,并在 queue 为空时让它“停止”。无论如何,这避免了繁忙的旋转。
只需将 start
替换为
void start() {
post(strand_, [this, self = shared_from_this()] {
if (!order_response_queue_.empty()) {
do_write();
}
do_read();
});
}
还有类似的东西
void enqueue_response(Message stuff) {
post(strand_, [this, stuff, self = shared_from_this()] {
order_response_queue_.push(std::move(stuff));
if (1 == order_response_queue_.size()) { // not idle/already writing
do_write();
}
});
}
请注意,每个使用共享 queues 的操作都在逻辑链上同步,以防您使用多线程执行上下文。
草图
我不太明白 Serialize/Deserialize 背后与 Message 相关的想法,所以这可能行不通 - 虽然我觉得它会给你很多关于如何简化事情的线索:
#include <thread>
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <queue>
using boost::asio::ip::tcp;
struct Message {
struct header_t {
// TODO add magic bytes
std::uint32_t body_len = 0; // network byte order
};
enum { header_length = sizeof(header_t) };
std::string _contents{header_length};
size_t body_length() const {
return length() - header_length;
}
void body_length(size_t n) {
_contents.resize(n+header_length);
encode_header();
}
bool decode_header() {
assert(_contents.length() >= header_length);
auto &header = *reinterpret_cast<header_t *>(_contents.data());
body_length(ntohl(header.body_len));
// TODO verify magic bytes
return true;
}
void encode_header() {
assert(_contents.length() >= header_length);
auto &header = *reinterpret_cast<header_t *>(_contents.data());
header.body_len = htonl(_contents.length() - header_length);
}
std::string SerializeToString() const { return _contents; }
void DeserializeFromChar(char const*) { } // TODO IMPLEMENTATION
char *data() { return _contents.data(); }
char *body() { return data() + header_length; }
char const *data() const { return _contents.data(); }
char const *body() const { return data() + header_length; }
size_t length() const { return _contents.size(); }
bool operator<(Message const &rhs) const { return _contents < rhs._contents; }
};
struct X : std::enable_shared_from_this<X> {
std::queue<Message> order_request_queue_, order_response_queue_;
void start() {
post(strand_, [this, self = shared_from_this()] {
if (!order_response_queue_.empty()) {
do_write();
}
do_read();
});
}
void enqueue_response(Message stuff) {
post(strand_, [this, stuff, self = shared_from_this()] {
order_response_queue_.push(std::move(stuff));
if (1 == order_response_queue_.size()) { // not idle/already writing
do_write();
}
});
}
void do_write() {
auto self(shared_from_this());
auto order_response = std::move(order_response_queue_.front());
order_response_queue_.pop();
Message msg;
try {
auto m = order_response.SerializeToString();
msg.body_length(std::strlen(m.c_str()));
std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
msg.encode_header();
assert(msg.length() <= data_.size());
std::memcpy(data_.data(), msg.data(), msg.length());
// std::memcpy(static_cast<void *>(msg.data()), data_,
// msg.length());
} catch (std::exception &e) {
std::cout << e.what();
}
std::cout << "write: " << msg.body() << "\n";
boost::asio::async_write(
socket_, boost::asio::buffer(data_, msg.length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (ec) {
std::cerr << "write error:" << ec.value()
<< " message: " << ec.message() << "\n";
} else if (!order_response_queue_.empty()) {
do_write();
}
});
}
Message res, req_;
void do_read() {
auto self(shared_from_this());
Message msg;
socket_.async_read_some(
boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec && res.decode_header()) {
std::string st(res.body());
boost::asio::async_read(
socket_,
boost::asio::buffer(res.body(), res.body_length()),
[this](boost::system::error_code ec,
std::size_t /*length*/) {
if (!ec) {
std::cout << "read " << res.body() << "\n";
req_.DeserializeFromChar(res.body());
order_request_queue_.push(req_);
} else {
if (ec) {
std::cerr << "read error:" << ec.value()
<< " message: " << ec.message()
<< "\n";
}
socket_.close();
}
});
}
do_read();
});
}
using Executor = boost::asio::system_executor;
boost::asio::strand<Executor> strand_ { Executor{} };
tcp::socket socket_ { strand_ };
std::array<char, 1024> data_;
// quick & dirty connection
X() { socket_.connect({{}, 8989}); }
};
int main() {
auto x = std::make_shared<X>();
x->start();
{
Message demo;
std::string_view greeting{"Hello world!"};
demo.body_length(greeting.length());
std::copy(greeting.begin(), greeting.end(), demo.body());
x->enqueue_response({});
}
boost::asio::query(boost::asio::system_executor(),
boost::asio::execution::context)
.join();
}
我试图构建一个服务器-客户端应用程序,服务器端是带有 boost::asio 异步的 c++,但是当我是 运行 应用程序时,它在 1 次读取和两次写入后卡住了。
这是我删除 do_write
函数调用时的相关代码 我收到了我发送的所有 5 条消息
编辑 当我 运行 调试器时,我能够接收所有五个消息并发送所有响应
void start()
{
std::thread t1([this](){do_write();});
t1.detach();
std::thread t2([this](){do_read();});
t2.detach();
}
void do_write()
{
auto self(shared_from_this());
Message msg;
while(order_response_queue_.empty())
{}
auto order_response = order_response_queue_.front();
order_response_queue_.pop();
try {
auto m = order_response.SerializeToString();
msg.body_length(std::strlen(m.c_str()));
std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
msg.encode_header();
std::memcpy(data_,msg.data(), msg.length());
//std::memcpy(static_cast<void *>(msg.data()), data_, msg.length());
} catch (std::exception& e )
{
std::cout << e.what();
}
std::cout <<"write: " << msg.body() << "\n";
boost::asio::async_write(socket_, boost::asio::buffer(data_,msg.length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (ec){
std::cerr << "write error:" << ec.value() << " message: " << ec.message() << "\n";
}
do_write();
});
}
void do_read()
{
auto self(shared_from_this());
Message msg;
socket_.async_read_some(boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec && res.decode_header()) {
std::string st(res.body());
boost::asio::async_read(socket_, boost::asio::buffer(res.body(), res.body_length()), [this](boost::system::error_code ec, std::size_t length) {
if (!ec) {
std::cout << "read " << res.body() << "\n";
req_.DeserializeFromChar(res.body());
order_request_queue_.push(req_);
} else {
if (ec) {
std::cerr << "read error:" << ec.value() << " message: "
<< ec.message() << "\n";
}
socket_.close();
}
});
}
do_read();
});
}
这里是 io_service
class Server
{
public:
Server(boost::asio::io_service& io_service, short port,std::queue<OrderRequest> &order_request_queue,
std::queue<OrderResponse> &order_response_queue)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service) , order_response_queue_(order_response_queue), order_request_queue_(order_request_queue)
{
do_accept();
}
private:
void do_accept(){
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec) {
if (!ec) {
std::cout << "accept connection\n";
std::make_shared<Session>(std::move(socket_),order_request_queue_, order_response_queue_)->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
std::queue<OrderResponse> &order_response_queue_;
std::queue<OrderRequest> &order_request_queue_;
};
你应该制作样本 self-contained。
- 什么是
Message
, data_
从哪里来,res
从哪里来- 为什么
data_
与msg.length()
, 一起使用
- 是否包括 header 长度,
req_
和res
有什么关系- 这些的生命周期是多少。
- 为什么要对线程使用异步
- 执行上下文在哪里运行,等等
这是您给我们的评论:
- 您正在从线程中使用
order_response_queue_
,没有任何锁定。这真的不行,因为do_write
线程上没有任何东西推送到那个 queue,所以其他线程必须这样做。
老实说,我认为最好的起点是删除线程。您似乎使用它们的主要原因是:
到“保持循环运行ning”,这已经是异步执行上下文的一个特性。但是,您已经从完成处理程序链接(例如再次调用
do_write
),所以没关系能够“等待”queue 上的消息:
while (order_response_queue_.empty()) { }
通常的做法是在推送第一条消息时有条件地启动循环,并在 queue 为空时让它“停止”。无论如何,这避免了繁忙的旋转。 只需将
start
替换为void start() { post(strand_, [this, self = shared_from_this()] { if (!order_response_queue_.empty()) { do_write(); } do_read(); }); }
还有类似的东西
void enqueue_response(Message stuff) { post(strand_, [this, stuff, self = shared_from_this()] { order_response_queue_.push(std::move(stuff)); if (1 == order_response_queue_.size()) { // not idle/already writing do_write(); } }); }
请注意,每个使用共享 queues 的操作都在逻辑链上同步,以防您使用多线程执行上下文。
草图
我不太明白 Serialize/Deserialize 背后与 Message 相关的想法,所以这可能行不通 - 虽然我觉得它会给你很多关于如何简化事情的线索:
#include <thread>
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <queue>
using boost::asio::ip::tcp;
struct Message {
struct header_t {
// TODO add magic bytes
std::uint32_t body_len = 0; // network byte order
};
enum { header_length = sizeof(header_t) };
std::string _contents{header_length};
size_t body_length() const {
return length() - header_length;
}
void body_length(size_t n) {
_contents.resize(n+header_length);
encode_header();
}
bool decode_header() {
assert(_contents.length() >= header_length);
auto &header = *reinterpret_cast<header_t *>(_contents.data());
body_length(ntohl(header.body_len));
// TODO verify magic bytes
return true;
}
void encode_header() {
assert(_contents.length() >= header_length);
auto &header = *reinterpret_cast<header_t *>(_contents.data());
header.body_len = htonl(_contents.length() - header_length);
}
std::string SerializeToString() const { return _contents; }
void DeserializeFromChar(char const*) { } // TODO IMPLEMENTATION
char *data() { return _contents.data(); }
char *body() { return data() + header_length; }
char const *data() const { return _contents.data(); }
char const *body() const { return data() + header_length; }
size_t length() const { return _contents.size(); }
bool operator<(Message const &rhs) const { return _contents < rhs._contents; }
};
struct X : std::enable_shared_from_this<X> {
std::queue<Message> order_request_queue_, order_response_queue_;
void start() {
post(strand_, [this, self = shared_from_this()] {
if (!order_response_queue_.empty()) {
do_write();
}
do_read();
});
}
void enqueue_response(Message stuff) {
post(strand_, [this, stuff, self = shared_from_this()] {
order_response_queue_.push(std::move(stuff));
if (1 == order_response_queue_.size()) { // not idle/already writing
do_write();
}
});
}
void do_write() {
auto self(shared_from_this());
auto order_response = std::move(order_response_queue_.front());
order_response_queue_.pop();
Message msg;
try {
auto m = order_response.SerializeToString();
msg.body_length(std::strlen(m.c_str()));
std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
msg.encode_header();
assert(msg.length() <= data_.size());
std::memcpy(data_.data(), msg.data(), msg.length());
// std::memcpy(static_cast<void *>(msg.data()), data_,
// msg.length());
} catch (std::exception &e) {
std::cout << e.what();
}
std::cout << "write: " << msg.body() << "\n";
boost::asio::async_write(
socket_, boost::asio::buffer(data_, msg.length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (ec) {
std::cerr << "write error:" << ec.value()
<< " message: " << ec.message() << "\n";
} else if (!order_response_queue_.empty()) {
do_write();
}
});
}
Message res, req_;
void do_read() {
auto self(shared_from_this());
Message msg;
socket_.async_read_some(
boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec && res.decode_header()) {
std::string st(res.body());
boost::asio::async_read(
socket_,
boost::asio::buffer(res.body(), res.body_length()),
[this](boost::system::error_code ec,
std::size_t /*length*/) {
if (!ec) {
std::cout << "read " << res.body() << "\n";
req_.DeserializeFromChar(res.body());
order_request_queue_.push(req_);
} else {
if (ec) {
std::cerr << "read error:" << ec.value()
<< " message: " << ec.message()
<< "\n";
}
socket_.close();
}
});
}
do_read();
});
}
using Executor = boost::asio::system_executor;
boost::asio::strand<Executor> strand_ { Executor{} };
tcp::socket socket_ { strand_ };
std::array<char, 1024> data_;
// quick & dirty connection
X() { socket_.connect({{}, 8989}); }
};
int main() {
auto x = std::make_shared<X>();
x->start();
{
Message demo;
std::string_view greeting{"Hello world!"};
demo.body_length(greeting.length());
std::copy(greeting.begin(), greeting.end(), demo.body());
x->enqueue_response({});
}
boost::asio::query(boost::asio::system_executor(),
boost::asio::execution::context)
.join();
}