如何确保消息在多线程 Asio io_service 上按时间顺序入队?
How to ensure that the messages will be enqueued in chronological order on multithreaded Asio io_service?
在 Michael Caisse's cppcon talk 之后,我创建了一个连接处理程序 MyUserConnection
,它有一个 sendMessage
方法。 sendMessage
方法向队列添加一条消息,类似于 cppcon talk 中的 send()
。我的 sendMessage
方法以高间隔从连接处理程序外部的多个线程调用。消息必须按时间顺序排列。
当我 运行 我的代码只有一个 Asio io_service::run
调用(又名一个 io_service 线程)时,它 async_write
并按预期清空我的队列(FIFO ),但是,当有 4 个 io_service::run
调用时,就会出现问题,然后队列未填充或发送调用 未按时间顺序调用 .
class MyUserConnection : public std::enable_shared_from_this<MyUserConnection> {
public:
MyUserConnection(asio::io_service& io_service, SslSocket socket) :
service_(io_service),
socket_(std::move(socket)),
strand_(io_service) {
}
void sendMessage(std::string msg) {
auto self(shared_from_this());
service_.post(strand_.wrap([self, msg]() {
self->queueMessage(msg);
}));
}
private:
void queueMessage(const std::string& msg) {
bool writeInProgress = !sendPacketQueue_.empty();
sendPacketQueue_.push_back(msg);
if (!writeInProgress) {
startPacketSend();
}
}
void startPacketSend() {
auto self(shared_from_this());
asio::async_write(socket_,
asio::buffer(sendPacketQueue_.front().data(), sendPacketQueue_.front().length()),
strand_.wrap([self](const std::error_code& ec, std::size_t /*n*/) {
self->packetSendDone(ec);
}));
}
void packetSendDone(const std::error_code& ec) {
if (!ec) {
sendPacketQueue_.pop_front();
if (!sendPacketQueue_.empty()) { startPacketSend(); }
} else {
// end(); // My end call
}
}
asio::io_service& service_;
SslSocket socket_;
asio::io_service::strand strand_;
std::deque<std::string> sendPacketQueue_;
};
当 运行 在多线程 io_service 上连接处理程序时,我很确定我误解了 strand
和 io_service::post
。我也很确定消息没有按时间顺序排列,而不是消息没有按时间顺序排列 async_write。如何确保消息在 sendMessage
调用多线程 io_service 时按时间顺序排列?
在多核甚至单核抢占式 OS 上,您无法真正按照严格的时间顺序将消息送入队列。即使您使用互斥锁来同步对队列的写访问,一旦多个写入者等待互斥锁并且互斥锁变为空闲,就不再保证严格的顺序。充其量,等待写入线程获取互斥锁的顺序取决于实现(OS 代码相关),但最好假设它只是随机的。
话虽如此,严格的时间顺序首先是一个定义问题。为了解释这一点,假设您的 PC 有一些数字输出位(每个写入器线程 1 个),并且您将逻辑分析仪连接到这些位……想象一下,您在代码中选择了某个位置,在那里您切换了相应的位在您的排队功能中。即使该位切换仅在获取互斥量之前发生一条汇编指令,也有可能在编写程序代码接近该点时更改了顺序。您也可以事先将其设置为其他任意点(例如,当您进入入队功能时)。但是,同样的推理也适用。因此,严格的时间顺序本身就是一个定义问题。
有一个案例的类比,CPU 的中断控制器有多个输入,您试图构建一个系统来严格按时间顺序处理这些中断。即使所有中断输入都在同一时刻发出信号(一个开关,同时将它们全部拉到信号状态),也会发生一些顺序(例如,由硬件逻辑或仅由输入引脚处的噪声或由系统中断调度程序功能引起) (某些 CPU(例如 MIPS 4102)具有单个中断向量,汇编代码检查可能的中断源并分派给专用中断处理程序)。
这个类比有助于理解模式:它归结为同步系统上的异步输入。这本身就是一个众所周知的难题。
因此,您可能做的最好的事情就是为您的应用程序“严格排序”做出合适的定义并接受它。
然后,为了避免违反您的定义,您可以使用优先级队列而不是普通的 FIFO 数据类型,并使用一些原子计数器作为优先级:
- 在您选择的代码点,自动读取并递增计数器。
这是您的消息序列号。
- Assemble 您的消息并将其排入优先级队列,使用您的序列号作为优先级。
另一种可能的方法是定义“同时”的概念,它在队列的另一端是可检测的(因此,reader 不能假设一组“同时”消息的严格排序).这可以通过读取一些高频滴答计数来实现,并且所有那些具有相同“时间戳”的消息都被认为是同时在 reader 端。
如果您使用链,则保证顺序是您post对链进行操作的顺序。
当然,如果 post 线程之间存在某种“正确顺序”,那么您必须同步 它们 之间的 posting ,那是您的应用程序域。
这是一个现代化的、简化的 MyUserConnection class 和 self-contained 服务器测试程序:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <deque>
#include <iostream>
#include <mutex>
namespace asio = boost::asio;
namespace ssl = asio::ssl;
using asio::ip::tcp;
using boost::system::error_code;
using SslSocket = ssl::stream<tcp::socket>;
class MyUserConnection : public std::enable_shared_from_this<MyUserConnection> {
public:
MyUserConnection(SslSocket&& socket) : socket_(std::move(socket)) {}
void start() {
std::cerr << "Handshake initiated" << std::endl;
socket_.async_handshake(ssl::stream_base::handshake_type::server,
[self = shared_from_this()](error_code ec) {
std::cerr << "Handshake complete" << std::endl;
});
}
void sendMessage(std::string msg) {
post(socket_.get_executor(),
[self = shared_from_this(), msg = std::move(msg)]() {
self->queueMessage(msg);
});
}
private:
void queueMessage(std::string msg) {
outbox_.push_back(std::move(msg));
if (outbox_.size() == 1)
sendLoop();
}
void sendLoop() {
std::cerr << "Sendloop " << outbox_.size() << std::endl;
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()](error_code ec, std::size_t) {
if (!ec) {
outbox_.pop_front();
sendLoop();
} else {
end();
}
});
}
void end() {}
SslSocket socket_;
std::deque<std::string> outbox_;
};
int main() {
asio::thread_pool ioc;
ssl::context ctx(ssl::context::sslv23_server);
ctx.set_password_callback([](auto...) { return "test"; });
ctx.use_certificate_file("server.pem", ssl::context::file_format::pem);
ctx.use_private_key_file("server.pem", ssl::context::file_format::pem);
ctx.use_tmp_dh_file("dh2048.pem");
tcp::acceptor a(ioc, {{}, 8989u});
for (;;) {
auto s = a.accept(make_strand(ioc.get_executor()));
std::cerr << "accepted " << s.remote_endpoint() << std::endl;
auto sess = make_shared<MyUserConnection>(SslSocket(std::move(s), ctx));
sess->start();
for(int i = 0; i<30; ++i) {
post(ioc, [sess, i] {
std::string msg = "message #" + std::to_string(i) + "\n";
{
static std::mutex mx;
// Lock so console output is guaranteed in the same order
// as the sendMessage call
std::lock_guard lk(mx);
std::cout << "Sending " << msg << std::flush;
sess->sendMessage(std::move(msg));
}
});
}
break; // for online demo
}
ioc.join();
}
如果你运行它几次,你就会看到
- 线程的顺序post是不确定的(这取决于内核调度)
- 发送(和接收)消息的顺序完全它们post编辑的顺序。
在我的机器上查看现场演示 运行s:
在 Michael Caisse's cppcon talk 之后,我创建了一个连接处理程序 MyUserConnection
,它有一个 sendMessage
方法。 sendMessage
方法向队列添加一条消息,类似于 cppcon talk 中的 send()
。我的 sendMessage
方法以高间隔从连接处理程序外部的多个线程调用。消息必须按时间顺序排列。
当我 运行 我的代码只有一个 Asio io_service::run
调用(又名一个 io_service 线程)时,它 async_write
并按预期清空我的队列(FIFO ),但是,当有 4 个 io_service::run
调用时,就会出现问题,然后队列未填充或发送调用 未按时间顺序调用 .
class MyUserConnection : public std::enable_shared_from_this<MyUserConnection> {
public:
MyUserConnection(asio::io_service& io_service, SslSocket socket) :
service_(io_service),
socket_(std::move(socket)),
strand_(io_service) {
}
void sendMessage(std::string msg) {
auto self(shared_from_this());
service_.post(strand_.wrap([self, msg]() {
self->queueMessage(msg);
}));
}
private:
void queueMessage(const std::string& msg) {
bool writeInProgress = !sendPacketQueue_.empty();
sendPacketQueue_.push_back(msg);
if (!writeInProgress) {
startPacketSend();
}
}
void startPacketSend() {
auto self(shared_from_this());
asio::async_write(socket_,
asio::buffer(sendPacketQueue_.front().data(), sendPacketQueue_.front().length()),
strand_.wrap([self](const std::error_code& ec, std::size_t /*n*/) {
self->packetSendDone(ec);
}));
}
void packetSendDone(const std::error_code& ec) {
if (!ec) {
sendPacketQueue_.pop_front();
if (!sendPacketQueue_.empty()) { startPacketSend(); }
} else {
// end(); // My end call
}
}
asio::io_service& service_;
SslSocket socket_;
asio::io_service::strand strand_;
std::deque<std::string> sendPacketQueue_;
};
当 运行 在多线程 io_service 上连接处理程序时,我很确定我误解了 strand
和 io_service::post
。我也很确定消息没有按时间顺序排列,而不是消息没有按时间顺序排列 async_write。如何确保消息在 sendMessage
调用多线程 io_service 时按时间顺序排列?
在多核甚至单核抢占式 OS 上,您无法真正按照严格的时间顺序将消息送入队列。即使您使用互斥锁来同步对队列的写访问,一旦多个写入者等待互斥锁并且互斥锁变为空闲,就不再保证严格的顺序。充其量,等待写入线程获取互斥锁的顺序取决于实现(OS 代码相关),但最好假设它只是随机的。
话虽如此,严格的时间顺序首先是一个定义问题。为了解释这一点,假设您的 PC 有一些数字输出位(每个写入器线程 1 个),并且您将逻辑分析仪连接到这些位……想象一下,您在代码中选择了某个位置,在那里您切换了相应的位在您的排队功能中。即使该位切换仅在获取互斥量之前发生一条汇编指令,也有可能在编写程序代码接近该点时更改了顺序。您也可以事先将其设置为其他任意点(例如,当您进入入队功能时)。但是,同样的推理也适用。因此,严格的时间顺序本身就是一个定义问题。
有一个案例的类比,CPU 的中断控制器有多个输入,您试图构建一个系统来严格按时间顺序处理这些中断。即使所有中断输入都在同一时刻发出信号(一个开关,同时将它们全部拉到信号状态),也会发生一些顺序(例如,由硬件逻辑或仅由输入引脚处的噪声或由系统中断调度程序功能引起) (某些 CPU(例如 MIPS 4102)具有单个中断向量,汇编代码检查可能的中断源并分派给专用中断处理程序)。
这个类比有助于理解模式:它归结为同步系统上的异步输入。这本身就是一个众所周知的难题。
因此,您可能做的最好的事情就是为您的应用程序“严格排序”做出合适的定义并接受它。
然后,为了避免违反您的定义,您可以使用优先级队列而不是普通的 FIFO 数据类型,并使用一些原子计数器作为优先级:
- 在您选择的代码点,自动读取并递增计数器。 这是您的消息序列号。
- Assemble 您的消息并将其排入优先级队列,使用您的序列号作为优先级。
另一种可能的方法是定义“同时”的概念,它在队列的另一端是可检测的(因此,reader 不能假设一组“同时”消息的严格排序).这可以通过读取一些高频滴答计数来实现,并且所有那些具有相同“时间戳”的消息都被认为是同时在 reader 端。
如果您使用链,则保证顺序是您post对链进行操作的顺序。
当然,如果 post 线程之间存在某种“正确顺序”,那么您必须同步 它们 之间的 posting ,那是您的应用程序域。
这是一个现代化的、简化的 MyUserConnection class 和 self-contained 服务器测试程序:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <deque>
#include <iostream>
#include <mutex>
namespace asio = boost::asio;
namespace ssl = asio::ssl;
using asio::ip::tcp;
using boost::system::error_code;
using SslSocket = ssl::stream<tcp::socket>;
class MyUserConnection : public std::enable_shared_from_this<MyUserConnection> {
public:
MyUserConnection(SslSocket&& socket) : socket_(std::move(socket)) {}
void start() {
std::cerr << "Handshake initiated" << std::endl;
socket_.async_handshake(ssl::stream_base::handshake_type::server,
[self = shared_from_this()](error_code ec) {
std::cerr << "Handshake complete" << std::endl;
});
}
void sendMessage(std::string msg) {
post(socket_.get_executor(),
[self = shared_from_this(), msg = std::move(msg)]() {
self->queueMessage(msg);
});
}
private:
void queueMessage(std::string msg) {
outbox_.push_back(std::move(msg));
if (outbox_.size() == 1)
sendLoop();
}
void sendLoop() {
std::cerr << "Sendloop " << outbox_.size() << std::endl;
if (outbox_.empty())
return;
asio::async_write( //
socket_, asio::buffer(outbox_.front()),
[this, self = shared_from_this()](error_code ec, std::size_t) {
if (!ec) {
outbox_.pop_front();
sendLoop();
} else {
end();
}
});
}
void end() {}
SslSocket socket_;
std::deque<std::string> outbox_;
};
int main() {
asio::thread_pool ioc;
ssl::context ctx(ssl::context::sslv23_server);
ctx.set_password_callback([](auto...) { return "test"; });
ctx.use_certificate_file("server.pem", ssl::context::file_format::pem);
ctx.use_private_key_file("server.pem", ssl::context::file_format::pem);
ctx.use_tmp_dh_file("dh2048.pem");
tcp::acceptor a(ioc, {{}, 8989u});
for (;;) {
auto s = a.accept(make_strand(ioc.get_executor()));
std::cerr << "accepted " << s.remote_endpoint() << std::endl;
auto sess = make_shared<MyUserConnection>(SslSocket(std::move(s), ctx));
sess->start();
for(int i = 0; i<30; ++i) {
post(ioc, [sess, i] {
std::string msg = "message #" + std::to_string(i) + "\n";
{
static std::mutex mx;
// Lock so console output is guaranteed in the same order
// as the sendMessage call
std::lock_guard lk(mx);
std::cout << "Sending " << msg << std::flush;
sess->sendMessage(std::move(msg));
}
});
}
break; // for online demo
}
ioc.join();
}
如果你运行它几次,你就会看到
- 线程的顺序post是不确定的(这取决于内核调度)
- 发送(和接收)消息的顺序完全它们post编辑的顺序。
在我的机器上查看现场演示 运行s: