C++ 中的高效消息工厂和处理程序
Efficient message factory and handler in C++
我们公司正在用 C++11 重写大部分遗留 C 代码。 (这也意味着我是一名学习 C++ 的 C 程序员)。我需要有关消息处理程序的建议。
我们有分布式系统 - 服务器进程通过 TCP 向客户端进程发送打包消息。
在 C 代码中是这样做的:
- 根据类型和子类型解析消息,它们始终是前 2 个字段
- call a handler as handler[type](Message *msg)
- handler creates temporary struct say, tmp_struct to hold the parsed values and ..
- calls subhandler[type][subtype](tmp_struct)
每个 type/subtype 只有一个处理程序。
转向 C++11 和多线程环境。我的基本想法是 -
1) 为每个type/subtype 组合注册一个处理器对象。这是
实际上是向量的向量 -
矢量<矢量>
class MsgProcessor {
// Factory function
virtual Message *create();
virtual Handler(Message *msg)
}
这将被不同的消息处理器继承
class AMsgProcessor : public MsgProcessor {
Message *create() override();
handler(Message *msg);
}
2) 通过查找向量中的向量来获取处理器。
使用重载的 create() 工厂函数获取消息。
这样我们就可以将实际消息和解析后的值保留在消息中。
3) 现在有点乱七八糟,这条消息应该发送到其他线程进行繁重的处理。为了避免再次在向量中查找,在消息中添加了一个指向 proc 的指针。
class Message {
const MsgProcessor *proc; // set to processor,
// which we got from the first lookup
// to get factory function.
};
所以其他线程会做
Message->proc->Handler(Message *);
这看起来很糟糕,但希望这有助于将消息处理程序与工厂分开。这是针对这种情况,当多个 type/subtype 想要创建相同的消息,但处理方式不同时。
我正在搜索这个并遇到了:
http://www.drdobbs.com/cpp/message-handling-without-dependencies/184429055?pgno=1
它提供了一种将消息与处理程序完全分离的方法。但我想知道我上面的简单方案是否会被视为可接受的设计。这也是实现我想要的错误方式吗?
与速度一样,效率是此应用程序最重要的要求。我们已经在做几个内存 Jumbs => 2 个向量 + 虚函数调用创建消息。处理程序有 2 个遵从,我猜从缓存的角度来看这并不好。
虽然你的要求不明确,但我想我有一个设计可能是你正在寻找的。
查看 http://coliru.stacked-crooked.com/a/f7f9d5e7d57e6261 以获得完整的示例。
它有以下组件:
- 用于消息处理器
IMessageProcessor
的接口 class。
- 表示消息的基 class。
Message
- 一个注册class,本质上是一个单例,用于存储与(类型,子类型)对对应的消息处理器。
Registrator
。它将映射存储在 unordered_map
中。您也可以稍微调整一下以获得更好的性能。 Registrator
的所有暴露的 API 都受到 std::mutex
. 的保护
- MessageProcessor 的具体实现。
AMsgProcessor
和 BMsgProcessor
在这种情况下。
simulate
函数来展示它们是如何组合在一起的。
也在这里粘贴代码:
/*
*
*/
#include <iostream>
#include <vector>
#include <tuple>
#include <mutex>
#include <memory>
#include <cassert>
#include <unordered_map>
class Message;
class IMessageProcessor
{
public:
virtual Message* create() = 0;
virtual void handle_message(Message*) = 0;
virtual ~IMessageProcessor() {};
};
/*
* Base message class
*/
class Message
{
public:
virtual void populate() = 0;
virtual ~Message() {};
};
using Type = int;
using SubType = int;
using TypeCombo = std::pair<Type, SubType>;
using IMsgProcUptr = std::unique_ptr<IMessageProcessor>;
/*
* Registrator class maintains all the registrations in an
* unordered_map.
* This class owns the MessageProcessor instance inside the
* unordered_map.
*/
class Registrator
{
public:
static Registrator* instance();
// Diable other types of construction
Registrator(const Registrator&) = delete;
void operator=(const Registrator&) = delete;
public:
// TypeCombo assumed to be cheap to copy
template <typename ProcT, typename... Args>
std::pair<bool, IMsgProcUptr> register_proc(TypeCombo typ, Args&&... args)
{
auto proc = std::make_unique<ProcT>(std::forward<Args>(args)...);
bool ok;
{
std::lock_guard<std::mutex> _(lock_);
std::tie(std::ignore, ok) = registrations_.insert(std::make_pair(typ, std::move(proc)));
}
return (ok == true) ? std::make_pair(true, nullptr) :
// Return the heap allocated instance back
// to the caller if the insert failed.
// The caller now owns the Processor
std::make_pair(false, std::move(proc));
}
// Get the processor corresponding to TypeCombo
// IMessageProcessor passed is non-owning pointer
// i.e the caller SHOULD not delete it or own it
std::pair<bool, IMessageProcessor*> processor(TypeCombo typ)
{
std::lock_guard<std::mutex> _(lock_);
auto fitr = registrations_.find(typ);
if (fitr == registrations_.end()) {
return std::make_pair(false, nullptr);
}
return std::make_pair(true, fitr->second.get());
}
// TypeCombo assumed to be cheap to copy
bool is_type_used(TypeCombo typ)
{
std::lock_guard<std::mutex> _(lock_);
return registrations_.find(typ) != registrations_.end();
}
bool deregister_proc(TypeCombo typ)
{
std::lock_guard<std::mutex> _(lock_);
return registrations_.erase(typ) == 1;
}
private:
Registrator() = default;
private:
std::mutex lock_;
/*
* Should be replaced with a concurrent map if at all this
* data structure is the main contention point (which I find
* very unlikely).
*/
struct HashTypeCombo
{
public:
std::size_t operator()(const TypeCombo& typ) const noexcept
{
return std::hash<decltype(typ.first)>()(typ.first) ^
std::hash<decltype(typ.second)>()(typ.second);
}
};
std::unordered_map<TypeCombo, IMsgProcUptr, HashTypeCombo> registrations_;
};
Registrator* Registrator::instance()
{
static Registrator inst;
return &inst;
/*
* OR some other DCLP based instance creation
* if lifetime or creation of static is an issue
*/
}
// Define some message processors
class AMsgProcessor final : public IMessageProcessor
{
public:
class AMsg final : public Message
{
public:
void populate() override {
std::cout << "Working on AMsg\n";
}
AMsg() = default;
~AMsg() = default;
};
Message* create() override
{
std::unique_ptr<AMsg> ptr(new AMsg);
return ptr.release();
}
void handle_message(Message* msg) override
{
assert (msg);
auto my_msg = static_cast<AMsg*>(msg);
//.... process my_msg ?
//.. probably being called in some other thread
// Who owns the msg ??
(void)my_msg; // only for suppressing warning
delete my_msg;
return;
}
~AMsgProcessor();
};
AMsgProcessor::~AMsgProcessor()
{
}
class BMsgProcessor final : public IMessageProcessor
{
public:
class BMsg final : public Message
{
public:
void populate() override {
std::cout << "Working on BMsg\n";
}
BMsg() = default;
~BMsg() = default;
};
Message* create() override
{
std::unique_ptr<BMsg> ptr(new BMsg);
return ptr.release();
}
void handle_message(Message* msg) override
{
assert (msg);
auto my_msg = static_cast<BMsg*>(msg);
//.... process my_msg ?
//.. probably being called in some other thread
//Who owns the msg ??
(void)my_msg; // only for suppressing warning
delete my_msg;
return;
}
~BMsgProcessor();
};
BMsgProcessor::~BMsgProcessor()
{
}
TypeCombo read_from_network()
{
return {1, 2};
}
struct ParsedData {
};
Message* populate_message(Message* msg, ParsedData& pdata)
{
// Do something with the message
// Calling a dummy populate method now
msg->populate();
(void)pdata;
return msg;
}
void simulate()
{
TypeCombo typ = read_from_network();
bool ok;
IMessageProcessor* proc = nullptr;
std::tie(ok, proc) = Registrator::instance()->processor(typ);
if (!ok) {
std::cerr << "FATAL!!!" << std::endl;
return;
}
ParsedData parsed_data;
//..... populate parsed_data here ....
proc->handle_message(populate_message(proc->create(), parsed_data));
return;
}
int main() {
/*
* TODO: Not making use or checking the return types after calling register
* its a must in production code!!
*/
// Register AMsgProcessor
Registrator::instance()->register_proc<AMsgProcessor>(std::make_pair(1, 1));
Registrator::instance()->register_proc<BMsgProcessor>(std::make_pair(1, 2));
simulate();
return 0;
}
更新 1
这里的主要混乱来源似乎是因为 even 系统的架构是未知的。
任何自我尊重的事件系统架构如下所示:
- 轮询套接字描述符的线程池。
- 用于处理定时器相关事件的线程池。
- 用于执行长时间阻塞作业的线程数量相对较少(取决于应用程序)。
所以,在你的情况下:
- 您将在执行
epoll_wait
或 select
或 poll
的线程上获得网络事件。
- 完全读取数据包并使用
Registrator::get_processor
调用获取处理器。
注意:如果可以保证底层 unordered_map
不会被修改,即可以在没有任何锁定的情况下进行 get_processor
调用,即一旦我们开始就不会进行新的插入接收事件。
- 使用获得的处理器,我们可以获得
Message
并填充它。
- 现在,这是我不太确定您想要的部分。此时,我们有了
processor
,您可以从当前线程调用 handle_message
,即正在执行 epoll_wait
的线程,或者通过发布作业将其分派到另一个线程(处理器和消息)到那个线程接收队列。
我们公司正在用 C++11 重写大部分遗留 C 代码。 (这也意味着我是一名学习 C++ 的 C 程序员)。我需要有关消息处理程序的建议。
我们有分布式系统 - 服务器进程通过 TCP 向客户端进程发送打包消息。
在 C 代码中是这样做的: - 根据类型和子类型解析消息,它们始终是前 2 个字段
- call a handler as handler[type](Message *msg)
- handler creates temporary struct say, tmp_struct to hold the parsed values and ..
- calls subhandler[type][subtype](tmp_struct)
每个 type/subtype 只有一个处理程序。
转向 C++11 和多线程环境。我的基本想法是 -
1) 为每个type/subtype 组合注册一个处理器对象。这是
实际上是向量的向量 -
矢量<矢量>
class MsgProcessor {
// Factory function
virtual Message *create();
virtual Handler(Message *msg)
}
这将被不同的消息处理器继承
class AMsgProcessor : public MsgProcessor {
Message *create() override();
handler(Message *msg);
}
2) 通过查找向量中的向量来获取处理器。 使用重载的 create() 工厂函数获取消息。 这样我们就可以将实际消息和解析后的值保留在消息中。
3) 现在有点乱七八糟,这条消息应该发送到其他线程进行繁重的处理。为了避免再次在向量中查找,在消息中添加了一个指向 proc 的指针。
class Message {
const MsgProcessor *proc; // set to processor,
// which we got from the first lookup
// to get factory function.
};
所以其他线程会做
Message->proc->Handler(Message *);
这看起来很糟糕,但希望这有助于将消息处理程序与工厂分开。这是针对这种情况,当多个 type/subtype 想要创建相同的消息,但处理方式不同时。
我正在搜索这个并遇到了:
http://www.drdobbs.com/cpp/message-handling-without-dependencies/184429055?pgno=1
它提供了一种将消息与处理程序完全分离的方法。但我想知道我上面的简单方案是否会被视为可接受的设计。这也是实现我想要的错误方式吗?
与速度一样,效率是此应用程序最重要的要求。我们已经在做几个内存 Jumbs => 2 个向量 + 虚函数调用创建消息。处理程序有 2 个遵从,我猜从缓存的角度来看这并不好。
虽然你的要求不明确,但我想我有一个设计可能是你正在寻找的。
查看 http://coliru.stacked-crooked.com/a/f7f9d5e7d57e6261 以获得完整的示例。
它有以下组件:
- 用于消息处理器
IMessageProcessor
的接口 class。 - 表示消息的基 class。
Message
- 一个注册class,本质上是一个单例,用于存储与(类型,子类型)对对应的消息处理器。
Registrator
。它将映射存储在unordered_map
中。您也可以稍微调整一下以获得更好的性能。Registrator
的所有暴露的 API 都受到std::mutex
. 的保护
- MessageProcessor 的具体实现。
AMsgProcessor
和BMsgProcessor
在这种情况下。 simulate
函数来展示它们是如何组合在一起的。
也在这里粘贴代码:
/*
*
*/
#include <iostream>
#include <vector>
#include <tuple>
#include <mutex>
#include <memory>
#include <cassert>
#include <unordered_map>
class Message;
class IMessageProcessor
{
public:
virtual Message* create() = 0;
virtual void handle_message(Message*) = 0;
virtual ~IMessageProcessor() {};
};
/*
* Base message class
*/
class Message
{
public:
virtual void populate() = 0;
virtual ~Message() {};
};
using Type = int;
using SubType = int;
using TypeCombo = std::pair<Type, SubType>;
using IMsgProcUptr = std::unique_ptr<IMessageProcessor>;
/*
* Registrator class maintains all the registrations in an
* unordered_map.
* This class owns the MessageProcessor instance inside the
* unordered_map.
*/
class Registrator
{
public:
static Registrator* instance();
// Diable other types of construction
Registrator(const Registrator&) = delete;
void operator=(const Registrator&) = delete;
public:
// TypeCombo assumed to be cheap to copy
template <typename ProcT, typename... Args>
std::pair<bool, IMsgProcUptr> register_proc(TypeCombo typ, Args&&... args)
{
auto proc = std::make_unique<ProcT>(std::forward<Args>(args)...);
bool ok;
{
std::lock_guard<std::mutex> _(lock_);
std::tie(std::ignore, ok) = registrations_.insert(std::make_pair(typ, std::move(proc)));
}
return (ok == true) ? std::make_pair(true, nullptr) :
// Return the heap allocated instance back
// to the caller if the insert failed.
// The caller now owns the Processor
std::make_pair(false, std::move(proc));
}
// Get the processor corresponding to TypeCombo
// IMessageProcessor passed is non-owning pointer
// i.e the caller SHOULD not delete it or own it
std::pair<bool, IMessageProcessor*> processor(TypeCombo typ)
{
std::lock_guard<std::mutex> _(lock_);
auto fitr = registrations_.find(typ);
if (fitr == registrations_.end()) {
return std::make_pair(false, nullptr);
}
return std::make_pair(true, fitr->second.get());
}
// TypeCombo assumed to be cheap to copy
bool is_type_used(TypeCombo typ)
{
std::lock_guard<std::mutex> _(lock_);
return registrations_.find(typ) != registrations_.end();
}
bool deregister_proc(TypeCombo typ)
{
std::lock_guard<std::mutex> _(lock_);
return registrations_.erase(typ) == 1;
}
private:
Registrator() = default;
private:
std::mutex lock_;
/*
* Should be replaced with a concurrent map if at all this
* data structure is the main contention point (which I find
* very unlikely).
*/
struct HashTypeCombo
{
public:
std::size_t operator()(const TypeCombo& typ) const noexcept
{
return std::hash<decltype(typ.first)>()(typ.first) ^
std::hash<decltype(typ.second)>()(typ.second);
}
};
std::unordered_map<TypeCombo, IMsgProcUptr, HashTypeCombo> registrations_;
};
Registrator* Registrator::instance()
{
static Registrator inst;
return &inst;
/*
* OR some other DCLP based instance creation
* if lifetime or creation of static is an issue
*/
}
// Define some message processors
class AMsgProcessor final : public IMessageProcessor
{
public:
class AMsg final : public Message
{
public:
void populate() override {
std::cout << "Working on AMsg\n";
}
AMsg() = default;
~AMsg() = default;
};
Message* create() override
{
std::unique_ptr<AMsg> ptr(new AMsg);
return ptr.release();
}
void handle_message(Message* msg) override
{
assert (msg);
auto my_msg = static_cast<AMsg*>(msg);
//.... process my_msg ?
//.. probably being called in some other thread
// Who owns the msg ??
(void)my_msg; // only for suppressing warning
delete my_msg;
return;
}
~AMsgProcessor();
};
AMsgProcessor::~AMsgProcessor()
{
}
class BMsgProcessor final : public IMessageProcessor
{
public:
class BMsg final : public Message
{
public:
void populate() override {
std::cout << "Working on BMsg\n";
}
BMsg() = default;
~BMsg() = default;
};
Message* create() override
{
std::unique_ptr<BMsg> ptr(new BMsg);
return ptr.release();
}
void handle_message(Message* msg) override
{
assert (msg);
auto my_msg = static_cast<BMsg*>(msg);
//.... process my_msg ?
//.. probably being called in some other thread
//Who owns the msg ??
(void)my_msg; // only for suppressing warning
delete my_msg;
return;
}
~BMsgProcessor();
};
BMsgProcessor::~BMsgProcessor()
{
}
TypeCombo read_from_network()
{
return {1, 2};
}
struct ParsedData {
};
Message* populate_message(Message* msg, ParsedData& pdata)
{
// Do something with the message
// Calling a dummy populate method now
msg->populate();
(void)pdata;
return msg;
}
void simulate()
{
TypeCombo typ = read_from_network();
bool ok;
IMessageProcessor* proc = nullptr;
std::tie(ok, proc) = Registrator::instance()->processor(typ);
if (!ok) {
std::cerr << "FATAL!!!" << std::endl;
return;
}
ParsedData parsed_data;
//..... populate parsed_data here ....
proc->handle_message(populate_message(proc->create(), parsed_data));
return;
}
int main() {
/*
* TODO: Not making use or checking the return types after calling register
* its a must in production code!!
*/
// Register AMsgProcessor
Registrator::instance()->register_proc<AMsgProcessor>(std::make_pair(1, 1));
Registrator::instance()->register_proc<BMsgProcessor>(std::make_pair(1, 2));
simulate();
return 0;
}
更新 1
这里的主要混乱来源似乎是因为 even 系统的架构是未知的。
任何自我尊重的事件系统架构如下所示:
- 轮询套接字描述符的线程池。
- 用于处理定时器相关事件的线程池。
- 用于执行长时间阻塞作业的线程数量相对较少(取决于应用程序)。
所以,在你的情况下:
- 您将在执行
epoll_wait
或select
或poll
的线程上获得网络事件。 - 完全读取数据包并使用
Registrator::get_processor
调用获取处理器。 注意:如果可以保证底层unordered_map
不会被修改,即可以在没有任何锁定的情况下进行get_processor
调用,即一旦我们开始就不会进行新的插入接收事件。 - 使用获得的处理器,我们可以获得
Message
并填充它。 - 现在,这是我不太确定您想要的部分。此时,我们有了
processor
,您可以从当前线程调用handle_message
,即正在执行epoll_wait
的线程,或者通过发布作业将其分派到另一个线程(处理器和消息)到那个线程接收队列。