根据报文序列号分发响应报文
dispatch response packet according to packet sequence id
我有一个第三方服务器,我正在为它写一个dll接口,我的客户使用我的dll与服务器通信。
该协议使用长 tcp 连接,所有流量都来自此 tcp 连接。可能同时有sending/receiving多个数据包,比如同时有send_msg
和heart_beat
,所以我必须使用async_write/async_read 以防止阻塞操作。每个数据包都有其序列号。例如,我发送了一条序列号为==123的消息,那么我应该等待服务器响应一个序列号为==123的数据包。
UPDATE: 不保证服务器按顺序响应数据包。如果两个数据包按A
、B
的顺序发送,响应顺序可能是response_B
、response_A
。序列 ID 是识别数据包的唯一方法。
数据包看起来像:
4bytes size + 2 bytes crc check + 4 bytes SEQUENCE ID + ....
问题是使用我的 dll 的客户更喜欢以阻塞方式使用功能,他们不喜欢回调。例如,他们喜欢
bool DLL_EXPORT send_msg(...) {
// send msg via long_connection, the seq_id==123
// recv msg via long_connection, just want the packet with seq_id==123 (How?)
return if_msg_sent_successfully;
}
我用的是boost asio,不知道有没有boost的实用工具class,或者适合这种场景的设计模式,下面是我能想到的解决方案:
// use a global std::map<seq_id, packet_content>
std::map<int, std::string> map_received;
每次收到一个数据包,将seq_id
和packet_body
写入map_received,send_msg
函数看起来像
bool DLL_EXPORT send_msg(...) {
// async_send msg via long_connection, the seq_id==123
while(not_time_out) {
if(map_received.find(123) != map_received.end()) {
// get the packet and erase the 123 pair
}
Sleep(10); // prevent cpu cost
}
return if_msg_sent_successfully;
}
这个解决方案很丑陋,必须有更好的设计。有什么想法吗?
您可以使用 std::promise
和 std::future
(如果您还没有使用 C++11,则可以使用它们的 boost 对应物)。
这个想法是在每次发送请求时,将一个 std::shared_ptr<std::promise<bool>>
与当前序列 ID 作为键存储在映射中。
在阻塞发送函数中,您等待设置相应的 std::future<bool>
。
当收到响应包时,从map中取出对应的std::promise<bool>
并设置值,发送函数为"unblocked".
以下示例大致基于 Boost asio documentation 中的聊天客户端示例,并不完整(例如缺少连接部分,header 和 body 读取未拆分ETC。)。由于它不完整,我没有进行运行时测试,但它应该可以说明这个想法。
#include <thread>
#include <map>
#include <future>
#include <iostream>
#include <boost/asio.hpp>
class Message
{
public:
enum { header_length = 10 };
enum { max_body_length = 512 };
Message()
: body_length_(0)
{
}
const char* data() const
{
return data_;
}
char* data()
{
return data_;
}
std::size_t length() const
{
return header_length + body_length_;
}
const char* body() const
{
return data_ + header_length;
}
char* body()
{
return data_ + header_length;
}
private:
char data_[header_length + max_body_length];
std::size_t body_length_;
};
class Client
{
public:
Client(boost::asio::io_service& io_service)
: io_service(io_service),
socket(io_service),
current_sequence_id(0)
{}
bool blocking_send(const std::string& msg)
{
auto future = async_send(msg);
// blocking wait
return future.get();
}
void start_reading()
{
auto handler = [this](boost::system::error_code ec, std::size_t /*length*/)
{
if(!ec)
{
// parse response ...
int response_id = 0;
auto promise = map_received[response_id];
promise->set_value(true);
map_received.erase(response_id);
}
};
boost::asio::async_read(socket,
boost::asio::buffer(read_msg_.data(), Message::header_length),
handler);
}
void connect()
{
// ...
start_reading();
}
private:
std::future<bool> async_send(const std::string& msg)
{
auto promise = std::make_shared<std::promise<bool>>();
auto handler = [=](boost::system::error_code ec, std::size_t /*length*/){std::cout << ec << std::endl;};
boost::asio::async_write(socket,
boost::asio::buffer(msg),
handler);
// store promise in map
map_received[current_sequence_id] = promise;
current_sequence_id++;
return promise->get_future();
}
boost::asio::io_service& io_service;
boost::asio::ip::tcp::socket socket;
std::map<int, std::shared_ptr<std::promise<bool>>> map_received;
int current_sequence_id;
Message read_msg_;
};
int main()
{
boost::asio::io_service io_service;
Client client(io_service);
std::thread t([&io_service](){ io_service.run(); });
client.connect();
client.blocking_send("dummy1");
client.blocking_send("dummy2");
return 0;
}
我有一个第三方服务器,我正在为它写一个dll接口,我的客户使用我的dll与服务器通信。
该协议使用长 tcp 连接,所有流量都来自此 tcp 连接。可能同时有sending/receiving多个数据包,比如同时有send_msg
和heart_beat
,所以我必须使用async_write/async_read 以防止阻塞操作。每个数据包都有其序列号。例如,我发送了一条序列号为==123的消息,那么我应该等待服务器响应一个序列号为==123的数据包。
UPDATE: 不保证服务器按顺序响应数据包。如果两个数据包按A
、B
的顺序发送,响应顺序可能是response_B
、response_A
。序列 ID 是识别数据包的唯一方法。
数据包看起来像:
4bytes size + 2 bytes crc check + 4 bytes SEQUENCE ID + ....
问题是使用我的 dll 的客户更喜欢以阻塞方式使用功能,他们不喜欢回调。例如,他们喜欢
bool DLL_EXPORT send_msg(...) {
// send msg via long_connection, the seq_id==123
// recv msg via long_connection, just want the packet with seq_id==123 (How?)
return if_msg_sent_successfully;
}
我用的是boost asio,不知道有没有boost的实用工具class,或者适合这种场景的设计模式,下面是我能想到的解决方案:
// use a global std::map<seq_id, packet_content>
std::map<int, std::string> map_received;
每次收到一个数据包,将seq_id
和packet_body
写入map_received,send_msg
函数看起来像
bool DLL_EXPORT send_msg(...) {
// async_send msg via long_connection, the seq_id==123
while(not_time_out) {
if(map_received.find(123) != map_received.end()) {
// get the packet and erase the 123 pair
}
Sleep(10); // prevent cpu cost
}
return if_msg_sent_successfully;
}
这个解决方案很丑陋,必须有更好的设计。有什么想法吗?
您可以使用 std::promise
和 std::future
(如果您还没有使用 C++11,则可以使用它们的 boost 对应物)。
这个想法是在每次发送请求时,将一个 std::shared_ptr<std::promise<bool>>
与当前序列 ID 作为键存储在映射中。
在阻塞发送函数中,您等待设置相应的 std::future<bool>
。
当收到响应包时,从map中取出对应的std::promise<bool>
并设置值,发送函数为"unblocked".
以下示例大致基于 Boost asio documentation 中的聊天客户端示例,并不完整(例如缺少连接部分,header 和 body 读取未拆分ETC。)。由于它不完整,我没有进行运行时测试,但它应该可以说明这个想法。
#include <thread>
#include <map>
#include <future>
#include <iostream>
#include <boost/asio.hpp>
class Message
{
public:
enum { header_length = 10 };
enum { max_body_length = 512 };
Message()
: body_length_(0)
{
}
const char* data() const
{
return data_;
}
char* data()
{
return data_;
}
std::size_t length() const
{
return header_length + body_length_;
}
const char* body() const
{
return data_ + header_length;
}
char* body()
{
return data_ + header_length;
}
private:
char data_[header_length + max_body_length];
std::size_t body_length_;
};
class Client
{
public:
Client(boost::asio::io_service& io_service)
: io_service(io_service),
socket(io_service),
current_sequence_id(0)
{}
bool blocking_send(const std::string& msg)
{
auto future = async_send(msg);
// blocking wait
return future.get();
}
void start_reading()
{
auto handler = [this](boost::system::error_code ec, std::size_t /*length*/)
{
if(!ec)
{
// parse response ...
int response_id = 0;
auto promise = map_received[response_id];
promise->set_value(true);
map_received.erase(response_id);
}
};
boost::asio::async_read(socket,
boost::asio::buffer(read_msg_.data(), Message::header_length),
handler);
}
void connect()
{
// ...
start_reading();
}
private:
std::future<bool> async_send(const std::string& msg)
{
auto promise = std::make_shared<std::promise<bool>>();
auto handler = [=](boost::system::error_code ec, std::size_t /*length*/){std::cout << ec << std::endl;};
boost::asio::async_write(socket,
boost::asio::buffer(msg),
handler);
// store promise in map
map_received[current_sequence_id] = promise;
current_sequence_id++;
return promise->get_future();
}
boost::asio::io_service& io_service;
boost::asio::ip::tcp::socket socket;
std::map<int, std::shared_ptr<std::promise<bool>>> map_received;
int current_sequence_id;
Message read_msg_;
};
int main()
{
boost::asio::io_service io_service;
Client client(io_service);
std::thread t([&io_service](){ io_service.run(); });
client.connect();
client.blocking_send("dummy1");
client.blocking_send("dummy2");
return 0;
}