从一个进程到另一个 C++ 的信号

Signal from one process to another C++

我知道标题有点宽泛,所以让我详细说明一下。
我有 2 个进程 运行,一个正在写入共享内存,另一个正在从中读取。
为了实现共享内存效果,我使用 boost::interprocess(顺便说一句,如果有更方便的库,请告诉我)。

所以我执行了以下操作:

//作家

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>

namespace ip = boost::interprocess;
class SharedMemory
{
public:
    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode, size_t size) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode, size))
    {
    }

    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode))
    {
    }

    std::shared_ptr<ip::windows_shared_memory> getSM()
    {
        return sm_;
    }
private:
    std::function<void()> destroyer_;
    std::string name_;
    std::shared_ptr<ip::windows_shared_memory> sm_;
};


int main()
{
    SharedMemory creator(ip::create_only, "SharedMemory", ip::read_write, 10);
    ip::mapped_region region(*creator.getSM(), ip::read_write);
    std::memset(region.get_address(), 1, region.get_size());

    int status = system("reader.exe");
    std::cout << status << std::endl;
}

所以我正在创建共享内存,向其中写入 1,然后调用 reader exe。 (我跳过 reader 部分,因为它几乎相同,但不是写而是读)

这段代码工作正常,我写入内存,其他进程读取它并打印我的 1。
但是,如果我同时拥有这 2 个 exe 运行 并且我想写入内存然后通知其他进程有更新怎么办?如何从一个 exe/process 向另一个发送信号?

场景是我正在流式传输一些实时数据,写入内存,然后告诉其他进程有更新。

我认为确实有更方便的方法。

原则上,要在进程之间进行同步,您使用与在进程内部(线程之间)进行同步的所有相同方法:使用同步原语(mutex/critical 部分、条件变量、信号量、屏障等)。

另外,你需要有一个你同步的数据结构。这恰恰是目前的阿喀琉斯之踵。这里完全没有数据结构。

尽管您可以 使用您自己的逻辑进行原始字节访问,但我看不出使用高级库这样做有什么吸引力。相反,我会使用托管内存段,它允许您按名称查找或构造类型化对象。这可能包括您的同步原语。

事实上,您可以使用已内置所有同步功能的 message_queue 来加快该过程。

手动同步:编写器使用段管理器

我将提供可移植代码,因为我没有 windows 机器。首先让我们考虑一个数据结构。一个简单的例子是消息队列。让我们使用 deque<string>.

不完全是微不足道的数据结构,但好消息是 Boost Interprocess 附带了使事情正常进行的所有细节(使用进程间分配器)。

namespace Shared {

    using Segment = ip::managed_shared_memory;
    using Mgr     = Segment::segment_manager;
    template <typename T>
    using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
    template <typename T> using Deque = bc::deque<T, Alloc<T>>;
    using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;

    using DataStructure = Deque<String>;

    class Memory {
      public:
        Memory(const char* name, size_t size)
            : name_(name)
            , sm_(ip::open_or_create, name, size)
            , data_(*sm_.find_or_construct<DataStructure>("data")(
                  sm_.get_segment_manager()))
        {
        }

        DataStructure&       get()       { return data_; } 
        DataStructure const& get() const { return data_; } 

      private:
        std::string    name_;
        Segment        sm_;
        DataStructure& data_;
    };

} // namespace Shared

好了,现在我们可以让 writer 变成这样了:

int main()
{
    Shared::Memory creator("SharedMemory", 10*1024*1024);

    creator.get().emplace_back("Hello");
    creator.get().emplace_back("World");

    std::cout << "Total queued: " << creator.get().size() << "\n";
}

这将打印例如

Total queued: 2
Total queued: 4
Total queued: 6

取决于你运行的次数。

Reader那边

现在让我们做 reader 的一面。其实大同小异,还是放在同一个主程序里吧:

int main(int argc, char**)
{
    Shared::Memory mem("SharedMemory", 10*1024*1024);
    auto& data = mem.get();

    bool is_reader = argc > 1;

    if (not is_reader) {
        data.emplace_back("Hello");
        data.emplace_back("World");
        std::cout << "Total queued: " << data.size() << "\n";
    } else {
        std::cout << "Found entries: " << data.size() << "\n";
        while (!data.empty()) {
            std::cout << "Dequeued " << data.front() << "\n";
            data.pop_front();
        }
    }

}

一开始很简单。现在 运行ning 例如test.exe READER 将相反地打印如下内容:

锁定与同步

目标是 运行 writer 和 reader 同时。由于缺少锁定和同步,这并不像现在这样安全。让我们添加它:

class Memory {
    static constexpr size_t max_capacity = 100;
  public:
    Memory(const char* name, size_t size)
        : name_(name)
        , sm_(ip::open_or_create, name, size)
        , mx_(*sm_.find_or_construct<Mutex>("mutex")())
        , cv_(*sm_.find_or_construct<Cond>("condition")())
        , data_(*sm_.find_or_construct<DataStructure>("data")(
              sm_.get_segment_manager()))
    { }

    // ... 

  private:
    std::string    name_;
    Segment        sm_;
    Mutex&         mx_;
    Cond&          cv_;
    DataStructure& data_;
};

现在让我们小心点。因为我们希望 data_ 队列上的所有操作都同步,所以我们不会像以前那样公开它(使用 get() 成员函数)。相反,我们公开了我们支持的操作的确切接口:

size_t queue_length() const;
void enqueue(std::string message); // blocking when queue at max_capacity
std::string dequeue();             // blocking dequeue
std::optional<std::string> try_dequeue(); // non-blocking dequeue

这些都按要求进行锁定,正如您所期望的那样:

size_t queue_length() const {
    ip::scoped_lock<Mutex> lk(mx_);
    return data_.size();
}

潜在的阻塞操作变得更加有趣。我选择了最大容量,所以enqueue需要等待容量:

// blocking when queue at max_capacity
void enqueue(std::string message) {
    ip::scoped_lock<Mutex> lk(mx_);
    cv_.wait(lk, [this] { return data_.size() < max_capacity; });

    data_.emplace_back(std::move(message));
    cv_.notify_one();
}

相反,dequeue 需要等待消息可用:

// blocking dequeue
std::string dequeue() {
    ip::scoped_lock<Mutex> lk(mx_);
    cv_.wait(lk, [this] { return not data_.empty(); });

    return do_pop();
}

或者,您可以使其成为非阻塞的,只需选择性地返回一个值:

// non-blocking dequeue
std::optional<std::string> try_dequeue() {
    ip::scoped_lock<Mutex> lk(mx_);

    if (data_.empty())
        return std::nullopt;
    return do_pop();
}

现在主要让我们有三个版本:writer,reader 和 continuous reader(后者演示阻塞接口):

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition_any.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>

#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp>

#include <iostream>
#include <iomanip>
#include <optional>

namespace ip = boost::interprocess;
namespace bc = boost::container;

namespace Shared {

    using Segment = ip::managed_shared_memory;
    using Mgr     = Segment::segment_manager;
    template <typename T>
    using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
    template <typename T> using Deque = ip::deque<T, Alloc<T>>;
    using String = ip::basic_string<char, std::char_traits<char>, Alloc<char>>;

    using DataStructure = Deque<String>;
    using Mutex         = ip::interprocess_mutex;
    using Cond          = ip::interprocess_condition;

    class Memory {
        static constexpr size_t max_capacity = 100;
      public:
        Memory(const char* name, size_t size)
            : name_(name)
            , sm_(ip::open_or_create, name, size)
            , mx_(*sm_.find_or_construct<Mutex>("mutex")())
            , cv_(*sm_.find_or_construct<Cond>("condition")())
            , data_(*sm_.find_or_construct<DataStructure>("data")(
                  sm_.get_segment_manager()))
        { }

        size_t queue_length() const {
            ip::scoped_lock<Mutex> lk(mx_);
            return data_.size(); // caution: racy by design!
        }

        // blocking when queue at max_capacity
        void enqueue(std::string message) {
            ip::scoped_lock<Mutex> lk(mx_);
            cv_.wait(lk, [this] { return data_.size() < max_capacity; });

            data_.emplace_back(std::move(message));

            cv_.notify_one();
        }

        // blocking dequeue
        std::string dequeue() {
            ip::scoped_lock<Mutex> lk(mx_);
            cv_.wait(lk, [this] { return not data_.empty(); });

            return do_pop();
        }

        // non-blocking dequeue
        std::optional<std::string> try_dequeue() {
            ip::scoped_lock<Mutex> lk(mx_);

            if (data_.empty())
                return std::nullopt;
            return do_pop();
        }

      private:
        std::string    name_;
        Segment        sm_;
        Mutex&         mx_;
        Cond&          cv_;
        DataStructure& data_;

        // Assumes mx_ locked by current thread!
        std::string do_pop() {
            auto&& tmp = std::move(data_.front());
            data_.pop_front();
            cv_.notify_all(); // any of the waiters might be a/the writer
            return std::string(tmp.begin(), tmp.end());
        }
    };

} // namespace Shared

int main(int argc, char**)
{
    Shared::Memory mem("SharedMemory", 10*1024*1024);

    switch (argc) {
    case 1:
        mem.enqueue("Hello");
        mem.enqueue("World");
        std::cout << "Total queued: " << mem.queue_length() << "\n";
        break;
    case 2:
        std::cout << "Found entries: " << mem.queue_length() << "\n";
        while (auto msg = mem.try_dequeue()) {
            std::cout << "Dequeued " << *msg << "\n";
        }
        break;
    case 3: 
        std::cout << "Continuous reader\n";
        while (true) {
            std::cout << "Dequeued " << mem.dequeue() << "\n";
        }
        break;
    }
}

小演示:

总结,注意

请注意,上面的内容有些松散。值得注意的是,Boost Interprocess 中缺少健壮的锁,需要格外小心才能在不持有锁的情况下正确关闭。

我建议也与 ip::message_queue 进行对比:

  • How to put file in boost::interprocess::managed_shared_memory?(对比共享内存,message_queue 和纯 TCP 套接字)