通用消息

Generic messaging

我正在使用 C++ 开发消息传递系统。我有;

class MessageData
{
public:
    typedef std::vector<std::shared_ptr<MessageData>> MessageList;

    virtual int getValue(std::shared_ptr<int>) { throw "Not implemented!"; };
    virtual float getValue(std::shared_ptr<float>) { throw "Not implemented!"; };
    virtual std::string getValue(std::shared_ptr<std::string>) { throw "Not implemented!"; };
    ...
    ...

    virtual ~MessageData() {};
};

template <typename T>
class Message : public MessageData
{
    T val;
public:
    static std::shared_ptr<Message<T>> Make(T val) { return std::make_shared<Message<T>>(val); };
    static T Get(std::shared_ptr<MessageData> in) { return in->getValue(std::make_shared<T>()); };
    Message(T i) { val = i; };
    T getValue(std::shared_ptr<T> out) override { return *out = val; }
    ~Message() {};
};

使用这些,我可以 send/receive 方便地使用例如 send/receive 不同长度的通用消息;

sendMessage(MessageData::MessageList{
                Message<std::string>::Make("paint"),
                Message<int>::Make(14),
                Message<float>::Make(129.3f),
                ...
            });

然后我得到值;

sendMessage(MessageData::MessageList data) {
    auto a = Message<std::string>::Get(data[0]);
    auto b = Message<int>::Get(data[1]);
    auto c = Message<float>::Get(data[2]);
    ...
}

缺点是我必须列出我需要在 MessageData 中使用的所有类型 class。这没什么大不了的,因为我可以限制我想要支持的类型,但我真的很好奇如何在不使用第 3 方库的情况下模板化类型列表。或者是否有一种完全不同且更好的方法,我可以使用类似的简洁语法和类型安全来传递消息?

使代码更通用的一种方法是:

template <typename ... Ts>
class MessageDataImp;

template <typename T>
class MessageDataImp<T>
{
public:
    virtual ~MessageDataImp() = default;
    virtual T getValue(std::shared_ptr<T>) { throw "Not implemented!"; };
};

template <typename T, typename ... Ts>
class MessageDataImp<T, Ts...> : public MessageDataImp<T>, public MessageDataImp<Ts...>
{
public:
    using MessageDataImp<T>::getValue;
    using MessageDataImp<Ts...>::getValue;
};

template <typename ... Ts>
class MessageDataTs : public MessageDataImp<Ts...>
{
public:
    typedef std::vector<std::shared_ptr<MessageDataTs<Ts...>>> MessageList;
};

using MessageData = MessageDataTs<int, float, std::string>;

假设它是一个简单的多reader、基于非优先队列的多写者消息总线,我想我会从这样的事情开始:-

请注意,我使用了 boost::variant/optional。如果您有可用的版本,这些可以很容易地替换为 std:: 版本。

我使用变体是因为它有效地满足了大多数具有编译时安全性的用例。

std/boost::任何版本都需要对您的总线用户进行大量(并且可能不受欢迎)的照顾。

#include <iostream>
#include <string>
#include <queue>
#include <thread>
#include <condition_variable>
#include <boost/variant.hpp>
#include <boost/optional.hpp>

template<class Mutex> auto get_lock(Mutex& m) { return std::unique_lock<Mutex>(m); }

template<class...Types>
struct message_bus
{
    using message_type = boost::variant<Types...>;

    void push(message_type msg) {
        auto lock = get_lock(mutex_);
        messages_.push(std::move(msg));
        lock.unlock();
        activity_.notify_one();
    }

    boost::optional<message_type> wait_pop()
    {
        boost::optional<message_type> result;
        auto lock = get_lock(mutex_);
        activity_.wait(lock, [this] { return this->stopped_ or not this->messages_.empty(); });
        if (not messages_.empty())
        {
            result = std::move(messages_.front());
            messages_.pop();
        }
        return result;
    }

    void signal_stop()
    {
        auto lock = get_lock(mutex_);
        stopped_ = true;
        lock.unlock();
        activity_.notify_all();
    }


    std::queue<message_type> messages_;
    std::mutex mutex_;
    std::condition_variable activity_;
    bool stopped_ = false;
};

static std::mutex emit_mutex;

template<class T>
void emit(const T& t)
{
    auto lock = get_lock(emit_mutex);
    std::cout << std::this_thread::get_id() << ": " << t << std::endl;;
}

int main()
{

    using bus_type = message_bus<std::string, int>;
    bus_type mb;

    std::vector<std::thread> threads;
    for (int i = 0 ; i < 10 ; ++i)
    {
        threads.emplace_back([&]
        {
            for(;;)
            {
                auto message = mb.wait_pop();
                if (not message)
                    break;
                boost::apply_visitor([](auto&& data) { emit(data); }, message.value());
            }
        });
    }

    for (int i = 0 ; i < 1000 ; ++i)
    {
        mb.push("string: " + std::to_string(i));
        mb.push(i);
    }
    mb.signal_stop();

    for (auto& t : threads) if (t.joinable()) t.join();

}

我想我已经为我的问题制定了一个不错的解决方案。

class MessageData {
public:
    typedef std::vector<std::shared_ptr<MessageData>> MessageList;
    virtual ~MessageData() {};
};

template<typename T>
class Message : public MessageData {
    T val;
public:
    template<typename U>
    friend U GetMessage(std::shared_ptr<MessageData> in);

    Message(T i) { val = i; };
};

template<typename T>
T GetMessage(std::shared_ptr<MessageData> in) {
    std::shared_ptr<Message<T>> tmp = std::dynamic_pointer_cast<Message<T>>(in);
    if (tmp) {
        return tmp->val;
    } 
    throw "Incorrect type!";
};

template<typename T>
std::shared_ptr<Message<T>> MakeMessage(T val)
{
    return std::make_shared<Message<T>>(val);
};

然后使用发送和接收值;

sendMessage(MessageData::MessageList{
                MakeMessage(std::string("paint")),
                MakeMessage(14),
                MakeMessage(129.3f),
                ...
            });

sendMessage(MessageData::MessageList data) {
    auto a = GetMessage<std::string>(data[0]);
    auto b = GetMessage<int>(data[1]);
    auto c = GetMessage<float>(data[2]);
    ...
}