asio::async_write 在大容量流上同步非常困难
asio::async_write incredibly difficult to synchronize on a high volume stream
我目前正在使用 Asio C++ 库并围绕它编写了一个客户端包装器。我最初的方法是非常基本的,只需要在一个方向上流动。要求已经改变,我已经切换到使用所有异步调用。除了 asio::async_write(...)
,大部分迁移都很容易。我使用了几种不同的方法,不可避免地 运行 陷入了每一种方法的僵局。
应用程序连续大量传输数据。我一直远离 strands,因为它们不会阻塞并且会导致内存问题,尤其是当服务器负载很重时。作业将备份并且应用程序堆无限增长。
所以我创建了一个阻塞队列只是为了找出在回调和/或阻塞事件之间使用锁会导致未知行为的困难方法。
wrapper 非常大class,所以我会尝试解释我目前的状态,希望能得到一些好的建议:
- 我有一个
asio::steady_timer
,它 运行 按照固定的时间表 将心跳消息直接推送 到阻塞队列中。
- 专用于读取事件并将它们推送到阻塞队列
的线程
- 线程专用于消耗阻塞队列
例如,在我的队列中,我有一个 queue::block()
和 queue::unblock()
,它们只是条件变量/互斥量的包装器。
std::thread consumer([this]() {
std::string message_buffer;
while (queue.pop(message_buffer)) {
queue.stage_block();
asio::async_write(*socket, asio::buffer(message_buffer), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
queue.block();
}
});
void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
queue.unblock();
}
当套接字备份并且服务器由于当前负载而无法再接受数据时,队列会填满并导致死锁,其中 handle_write(...)
永远不会被调用。
另一种方法完全消除了消费者线程并依赖 handle_write(...)
弹出队列。像这样:
void networking::write(const std::string& data) {
if (!queue.closed()) {
std::stringstream stream_buffer;
stream_buffer << data << std::endl;
spdlog::get("console")->debug("pushing to queue {}", queue.size());
queue.push(stream_buffer.str());
if (queue.size() == 1) {
spdlog::get("console")->debug("handle_write: {}", stream_buffer.str());
asio::async_write(*socket, asio::buffer(stream_buffer.str()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
}
}
}
void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
std::string message;
queue.pop(message);
if (!queue.closed() && !queue.empty()) {
std::string front = queue.front();
asio::async_write(*socket, asio::buffer(queue.front()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
}
}
这也导致了死锁,并且显然导致了其他竞争问题。当我禁用我的心跳回调时,我完全没有问题。但是,心跳是必需的。
我做错了什么?什么是更好的方法?
看来我所有的痛苦完全来自于心跳。在我的异步写入操作的每个变体中禁用心跳似乎可以解决我的问题,所以这让我相信这可能是使用内置 asio::async_wait(...)
和 asio::steady_timer
.[=19 的结果=]
Asio 在内部同步其工作并在执行下一个作业之前等待作业完成。使用 asio::async_wait(...)
构建我的心跳功能是我的设计缺陷,因为它在等待挂起作业的同一线程上运行。当心跳等待 queue::push(...)
时,它与 Asio 造成了僵局。这可以解释为什么 asio::async_write(...)
完成处理程序在我的第一个示例中从未执行过。
解决方案是将心跳放在自己的线程上,让它独立于 Asio 工作。我仍在使用我的阻塞队列来同步对 asio::async_write(...)
的调用,但已修改我的消费者线程以使用 std::future
和 std::promise
。这将回调与我的消费者线程完全同步。
std::thread networking::heartbeat_worker() {
return std::thread([&]() {
while (socket_opened) {
spdlog::get("console")->trace("heartbeat pending");
write(heartbeat_message);
spdlog::get("console")->trace("heartbeat sent");
std::unique_lock<std::mutex> lock(mutex);
socket_closed_event.wait_for(lock, std::chrono::milliseconds(heartbeat_interval), [&]() {
return !socket_opened;
});
}
spdlog::get("console")->trace("heartbeat thread exited gracefully");
});
}
我目前正在使用 Asio C++ 库并围绕它编写了一个客户端包装器。我最初的方法是非常基本的,只需要在一个方向上流动。要求已经改变,我已经切换到使用所有异步调用。除了 asio::async_write(...)
,大部分迁移都很容易。我使用了几种不同的方法,不可避免地 运行 陷入了每一种方法的僵局。
应用程序连续大量传输数据。我一直远离 strands,因为它们不会阻塞并且会导致内存问题,尤其是当服务器负载很重时。作业将备份并且应用程序堆无限增长。
所以我创建了一个阻塞队列只是为了找出在回调和/或阻塞事件之间使用锁会导致未知行为的困难方法。
wrapper 非常大class,所以我会尝试解释我目前的状态,希望能得到一些好的建议:
- 我有一个
asio::steady_timer
,它 运行 按照固定的时间表 将心跳消息直接推送 到阻塞队列中。 - 专用于读取事件并将它们推送到阻塞队列 的线程
- 线程专用于消耗阻塞队列
例如,在我的队列中,我有一个 queue::block()
和 queue::unblock()
,它们只是条件变量/互斥量的包装器。
std::thread consumer([this]() {
std::string message_buffer;
while (queue.pop(message_buffer)) {
queue.stage_block();
asio::async_write(*socket, asio::buffer(message_buffer), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
queue.block();
}
});
void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
queue.unblock();
}
当套接字备份并且服务器由于当前负载而无法再接受数据时,队列会填满并导致死锁,其中 handle_write(...)
永远不会被调用。
另一种方法完全消除了消费者线程并依赖 handle_write(...)
弹出队列。像这样:
void networking::write(const std::string& data) {
if (!queue.closed()) {
std::stringstream stream_buffer;
stream_buffer << data << std::endl;
spdlog::get("console")->debug("pushing to queue {}", queue.size());
queue.push(stream_buffer.str());
if (queue.size() == 1) {
spdlog::get("console")->debug("handle_write: {}", stream_buffer.str());
asio::async_write(*socket, asio::buffer(stream_buffer.str()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
}
}
}
void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
std::string message;
queue.pop(message);
if (!queue.closed() && !queue.empty()) {
std::string front = queue.front();
asio::async_write(*socket, asio::buffer(queue.front()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
}
}
这也导致了死锁,并且显然导致了其他竞争问题。当我禁用我的心跳回调时,我完全没有问题。但是,心跳是必需的。
我做错了什么?什么是更好的方法?
看来我所有的痛苦完全来自于心跳。在我的异步写入操作的每个变体中禁用心跳似乎可以解决我的问题,所以这让我相信这可能是使用内置 asio::async_wait(...)
和 asio::steady_timer
.[=19 的结果=]
Asio 在内部同步其工作并在执行下一个作业之前等待作业完成。使用 asio::async_wait(...)
构建我的心跳功能是我的设计缺陷,因为它在等待挂起作业的同一线程上运行。当心跳等待 queue::push(...)
时,它与 Asio 造成了僵局。这可以解释为什么 asio::async_write(...)
完成处理程序在我的第一个示例中从未执行过。
解决方案是将心跳放在自己的线程上,让它独立于 Asio 工作。我仍在使用我的阻塞队列来同步对 asio::async_write(...)
的调用,但已修改我的消费者线程以使用 std::future
和 std::promise
。这将回调与我的消费者线程完全同步。
std::thread networking::heartbeat_worker() {
return std::thread([&]() {
while (socket_opened) {
spdlog::get("console")->trace("heartbeat pending");
write(heartbeat_message);
spdlog::get("console")->trace("heartbeat sent");
std::unique_lock<std::mutex> lock(mutex);
socket_closed_event.wait_for(lock, std::chrono::milliseconds(heartbeat_interval), [&]() {
return !socket_opened;
});
}
spdlog::get("console")->trace("heartbeat thread exited gracefully");
});
}