从一个进程到另一个 C++ 的信号
Signal from one process to another C++
我知道标题有点宽泛,所以让我详细说明一下。
我有 2 个进程 运行,一个正在写入共享内存,另一个正在从中读取。
为了实现共享内存效果,我使用 boost::interprocess(顺便说一句,如果有更方便的库,请告诉我)。
所以我执行了以下操作:
//作家
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>
namespace ip = boost::interprocess;
class SharedMemory
{
public:
template<typename OpenOrCreate>
SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode, size_t size) :
name_(name),
sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode, size))
{
}
template<typename OpenOrCreate>
SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode) :
name_(name),
sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode))
{
}
std::shared_ptr<ip::windows_shared_memory> getSM()
{
return sm_;
}
private:
std::function<void()> destroyer_;
std::string name_;
std::shared_ptr<ip::windows_shared_memory> sm_;
};
int main()
{
SharedMemory creator(ip::create_only, "SharedMemory", ip::read_write, 10);
ip::mapped_region region(*creator.getSM(), ip::read_write);
std::memset(region.get_address(), 1, region.get_size());
int status = system("reader.exe");
std::cout << status << std::endl;
}
所以我正在创建共享内存,向其中写入 1,然后调用 reader exe。 (我跳过 reader 部分,因为它几乎相同,但不是写而是读)
这段代码工作正常,我写入内存,其他进程读取它并打印我的 1。
但是,如果我同时拥有这 2 个 exe 运行 并且我想写入内存然后通知其他进程有更新怎么办?如何从一个 exe/process 向另一个发送信号?
场景是我正在流式传输一些实时数据,写入内存,然后告诉其他进程有更新。
我认为确实有更方便的方法。
原则上,要在进程之间进行同步,您使用与在进程内部(线程之间)进行同步的所有相同方法:使用同步原语(mutex/critical 部分、条件变量、信号量、屏障等)。
另外,你需要有一个你同步的数据结构。这恰恰是目前的阿喀琉斯之踵。这里完全没有数据结构。
尽管您可以 使用您自己的逻辑进行原始字节访问,但我看不出使用高级库这样做有什么吸引力。相反,我会使用托管内存段,它允许您按名称查找或构造类型化对象。这可能包括您的同步原语。
事实上,您可以使用已内置所有同步功能的 message_queue
来加快该过程。
手动同步:编写器使用段管理器
我将提供可移植代码,因为我没有 windows 机器。首先让我们考虑一个数据结构。一个简单的例子是消息队列。让我们使用 deque<string>
.
不完全是微不足道的数据结构,但好消息是 Boost Interprocess 附带了使事情正常进行的所有细节(使用进程间分配器)。
namespace Shared {
using Segment = ip::managed_shared_memory;
using Mgr = Segment::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
template <typename T> using Deque = bc::deque<T, Alloc<T>>;
using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;
using DataStructure = Deque<String>;
class Memory {
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
{
}
DataStructure& get() { return data_; }
DataStructure const& get() const { return data_; }
private:
std::string name_;
Segment sm_;
DataStructure& data_;
};
} // namespace Shared
好了,现在我们可以让 writer 变成这样了:
int main()
{
Shared::Memory creator("SharedMemory", 10*1024*1024);
creator.get().emplace_back("Hello");
creator.get().emplace_back("World");
std::cout << "Total queued: " << creator.get().size() << "\n";
}
这将打印例如
Total queued: 2
Total queued: 4
Total queued: 6
取决于你运行的次数。
Reader那边
现在让我们做 reader 的一面。其实大同小异,还是放在同一个主程序里吧:
int main(int argc, char**)
{
Shared::Memory mem("SharedMemory", 10*1024*1024);
auto& data = mem.get();
bool is_reader = argc > 1;
if (not is_reader) {
data.emplace_back("Hello");
data.emplace_back("World");
std::cout << "Total queued: " << data.size() << "\n";
} else {
std::cout << "Found entries: " << data.size() << "\n";
while (!data.empty()) {
std::cout << "Dequeued " << data.front() << "\n";
data.pop_front();
}
}
}
一开始很简单。现在 运行ning 例如test.exe READER
将相反地打印如下内容:
锁定与同步
目标是 运行 writer 和 reader 同时。由于缺少锁定和同步,这并不像现在这样安全。让我们添加它:
class Memory {
static constexpr size_t max_capacity = 100;
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, mx_(*sm_.find_or_construct<Mutex>("mutex")())
, cv_(*sm_.find_or_construct<Cond>("condition")())
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
{ }
// ...
private:
std::string name_;
Segment sm_;
Mutex& mx_;
Cond& cv_;
DataStructure& data_;
};
现在让我们小心点。因为我们希望 data_
队列上的所有操作都同步,所以我们不会像以前那样公开它(使用 get()
成员函数)。相反,我们公开了我们支持的操作的确切接口:
size_t queue_length() const;
void enqueue(std::string message); // blocking when queue at max_capacity
std::string dequeue(); // blocking dequeue
std::optional<std::string> try_dequeue(); // non-blocking dequeue
这些都按要求进行锁定,正如您所期望的那样:
size_t queue_length() const {
ip::scoped_lock<Mutex> lk(mx_);
return data_.size();
}
潜在的阻塞操作变得更加有趣。我选择了最大容量,所以enqueue
需要等待容量:
// blocking when queue at max_capacity
void enqueue(std::string message) {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return data_.size() < max_capacity; });
data_.emplace_back(std::move(message));
cv_.notify_one();
}
相反,dequeue
需要等待消息可用:
// blocking dequeue
std::string dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return not data_.empty(); });
return do_pop();
}
或者,您可以使其成为非阻塞的,只需选择性地返回一个值:
// non-blocking dequeue
std::optional<std::string> try_dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
if (data_.empty())
return std::nullopt;
return do_pop();
}
现在主要让我们有三个版本:writer,reader 和 continuous reader(后者演示阻塞接口):
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition_any.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <iostream>
#include <iomanip>
#include <optional>
namespace ip = boost::interprocess;
namespace bc = boost::container;
namespace Shared {
using Segment = ip::managed_shared_memory;
using Mgr = Segment::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
template <typename T> using Deque = ip::deque<T, Alloc<T>>;
using String = ip::basic_string<char, std::char_traits<char>, Alloc<char>>;
using DataStructure = Deque<String>;
using Mutex = ip::interprocess_mutex;
using Cond = ip::interprocess_condition;
class Memory {
static constexpr size_t max_capacity = 100;
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, mx_(*sm_.find_or_construct<Mutex>("mutex")())
, cv_(*sm_.find_or_construct<Cond>("condition")())
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
{ }
size_t queue_length() const {
ip::scoped_lock<Mutex> lk(mx_);
return data_.size(); // caution: racy by design!
}
// blocking when queue at max_capacity
void enqueue(std::string message) {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return data_.size() < max_capacity; });
data_.emplace_back(std::move(message));
cv_.notify_one();
}
// blocking dequeue
std::string dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return not data_.empty(); });
return do_pop();
}
// non-blocking dequeue
std::optional<std::string> try_dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
if (data_.empty())
return std::nullopt;
return do_pop();
}
private:
std::string name_;
Segment sm_;
Mutex& mx_;
Cond& cv_;
DataStructure& data_;
// Assumes mx_ locked by current thread!
std::string do_pop() {
auto&& tmp = std::move(data_.front());
data_.pop_front();
cv_.notify_all(); // any of the waiters might be a/the writer
return std::string(tmp.begin(), tmp.end());
}
};
} // namespace Shared
int main(int argc, char**)
{
Shared::Memory mem("SharedMemory", 10*1024*1024);
switch (argc) {
case 1:
mem.enqueue("Hello");
mem.enqueue("World");
std::cout << "Total queued: " << mem.queue_length() << "\n";
break;
case 2:
std::cout << "Found entries: " << mem.queue_length() << "\n";
while (auto msg = mem.try_dequeue()) {
std::cout << "Dequeued " << *msg << "\n";
}
break;
case 3:
std::cout << "Continuous reader\n";
while (true) {
std::cout << "Dequeued " << mem.dequeue() << "\n";
}
break;
}
}
小演示:
总结,注意
请注意,上面的内容有些松散。值得注意的是,Boost Interprocess 中缺少健壮的锁,需要格外小心才能在不持有锁的情况下正确关闭。
我建议也与 ip::message_queue
进行对比:
- How to put file in boost::interprocess::managed_shared_memory?(对比共享内存,message_queue 和纯 TCP 套接字)
我知道标题有点宽泛,所以让我详细说明一下。
我有 2 个进程 运行,一个正在写入共享内存,另一个正在从中读取。
为了实现共享内存效果,我使用 boost::interprocess(顺便说一句,如果有更方便的库,请告诉我)。
所以我执行了以下操作:
//作家
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>
namespace ip = boost::interprocess;
class SharedMemory
{
public:
template<typename OpenOrCreate>
SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode, size_t size) :
name_(name),
sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode, size))
{
}
template<typename OpenOrCreate>
SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode) :
name_(name),
sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode))
{
}
std::shared_ptr<ip::windows_shared_memory> getSM()
{
return sm_;
}
private:
std::function<void()> destroyer_;
std::string name_;
std::shared_ptr<ip::windows_shared_memory> sm_;
};
int main()
{
SharedMemory creator(ip::create_only, "SharedMemory", ip::read_write, 10);
ip::mapped_region region(*creator.getSM(), ip::read_write);
std::memset(region.get_address(), 1, region.get_size());
int status = system("reader.exe");
std::cout << status << std::endl;
}
所以我正在创建共享内存,向其中写入 1,然后调用 reader exe。 (我跳过 reader 部分,因为它几乎相同,但不是写而是读)
这段代码工作正常,我写入内存,其他进程读取它并打印我的 1。
但是,如果我同时拥有这 2 个 exe 运行 并且我想写入内存然后通知其他进程有更新怎么办?如何从一个 exe/process 向另一个发送信号?
场景是我正在流式传输一些实时数据,写入内存,然后告诉其他进程有更新。
我认为确实有更方便的方法。
原则上,要在进程之间进行同步,您使用与在进程内部(线程之间)进行同步的所有相同方法:使用同步原语(mutex/critical 部分、条件变量、信号量、屏障等)。
另外,你需要有一个你同步的数据结构。这恰恰是目前的阿喀琉斯之踵。这里完全没有数据结构。
尽管您可以 使用您自己的逻辑进行原始字节访问,但我看不出使用高级库这样做有什么吸引力。相反,我会使用托管内存段,它允许您按名称查找或构造类型化对象。这可能包括您的同步原语。
事实上,您可以使用已内置所有同步功能的 message_queue
来加快该过程。
手动同步:编写器使用段管理器
我将提供可移植代码,因为我没有 windows 机器。首先让我们考虑一个数据结构。一个简单的例子是消息队列。让我们使用 deque<string>
.
不完全是微不足道的数据结构,但好消息是 Boost Interprocess 附带了使事情正常进行的所有细节(使用进程间分配器)。
namespace Shared {
using Segment = ip::managed_shared_memory;
using Mgr = Segment::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
template <typename T> using Deque = bc::deque<T, Alloc<T>>;
using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;
using DataStructure = Deque<String>;
class Memory {
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
{
}
DataStructure& get() { return data_; }
DataStructure const& get() const { return data_; }
private:
std::string name_;
Segment sm_;
DataStructure& data_;
};
} // namespace Shared
好了,现在我们可以让 writer 变成这样了:
int main()
{
Shared::Memory creator("SharedMemory", 10*1024*1024);
creator.get().emplace_back("Hello");
creator.get().emplace_back("World");
std::cout << "Total queued: " << creator.get().size() << "\n";
}
这将打印例如
Total queued: 2
Total queued: 4
Total queued: 6
取决于你运行的次数。
Reader那边
现在让我们做 reader 的一面。其实大同小异,还是放在同一个主程序里吧:
int main(int argc, char**)
{
Shared::Memory mem("SharedMemory", 10*1024*1024);
auto& data = mem.get();
bool is_reader = argc > 1;
if (not is_reader) {
data.emplace_back("Hello");
data.emplace_back("World");
std::cout << "Total queued: " << data.size() << "\n";
} else {
std::cout << "Found entries: " << data.size() << "\n";
while (!data.empty()) {
std::cout << "Dequeued " << data.front() << "\n";
data.pop_front();
}
}
}
一开始很简单。现在 运行ning 例如test.exe READER
将相反地打印如下内容:
锁定与同步
目标是 运行 writer 和 reader 同时。由于缺少锁定和同步,这并不像现在这样安全。让我们添加它:
class Memory {
static constexpr size_t max_capacity = 100;
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, mx_(*sm_.find_or_construct<Mutex>("mutex")())
, cv_(*sm_.find_or_construct<Cond>("condition")())
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
{ }
// ...
private:
std::string name_;
Segment sm_;
Mutex& mx_;
Cond& cv_;
DataStructure& data_;
};
现在让我们小心点。因为我们希望 data_
队列上的所有操作都同步,所以我们不会像以前那样公开它(使用 get()
成员函数)。相反,我们公开了我们支持的操作的确切接口:
size_t queue_length() const;
void enqueue(std::string message); // blocking when queue at max_capacity
std::string dequeue(); // blocking dequeue
std::optional<std::string> try_dequeue(); // non-blocking dequeue
这些都按要求进行锁定,正如您所期望的那样:
size_t queue_length() const {
ip::scoped_lock<Mutex> lk(mx_);
return data_.size();
}
潜在的阻塞操作变得更加有趣。我选择了最大容量,所以enqueue
需要等待容量:
// blocking when queue at max_capacity
void enqueue(std::string message) {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return data_.size() < max_capacity; });
data_.emplace_back(std::move(message));
cv_.notify_one();
}
相反,dequeue
需要等待消息可用:
// blocking dequeue
std::string dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return not data_.empty(); });
return do_pop();
}
或者,您可以使其成为非阻塞的,只需选择性地返回一个值:
// non-blocking dequeue
std::optional<std::string> try_dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
if (data_.empty())
return std::nullopt;
return do_pop();
}
现在主要让我们有三个版本:writer,reader 和 continuous reader(后者演示阻塞接口):
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition_any.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <iostream>
#include <iomanip>
#include <optional>
namespace ip = boost::interprocess;
namespace bc = boost::container;
namespace Shared {
using Segment = ip::managed_shared_memory;
using Mgr = Segment::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
template <typename T> using Deque = ip::deque<T, Alloc<T>>;
using String = ip::basic_string<char, std::char_traits<char>, Alloc<char>>;
using DataStructure = Deque<String>;
using Mutex = ip::interprocess_mutex;
using Cond = ip::interprocess_condition;
class Memory {
static constexpr size_t max_capacity = 100;
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, mx_(*sm_.find_or_construct<Mutex>("mutex")())
, cv_(*sm_.find_or_construct<Cond>("condition")())
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
{ }
size_t queue_length() const {
ip::scoped_lock<Mutex> lk(mx_);
return data_.size(); // caution: racy by design!
}
// blocking when queue at max_capacity
void enqueue(std::string message) {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return data_.size() < max_capacity; });
data_.emplace_back(std::move(message));
cv_.notify_one();
}
// blocking dequeue
std::string dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return not data_.empty(); });
return do_pop();
}
// non-blocking dequeue
std::optional<std::string> try_dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
if (data_.empty())
return std::nullopt;
return do_pop();
}
private:
std::string name_;
Segment sm_;
Mutex& mx_;
Cond& cv_;
DataStructure& data_;
// Assumes mx_ locked by current thread!
std::string do_pop() {
auto&& tmp = std::move(data_.front());
data_.pop_front();
cv_.notify_all(); // any of the waiters might be a/the writer
return std::string(tmp.begin(), tmp.end());
}
};
} // namespace Shared
int main(int argc, char**)
{
Shared::Memory mem("SharedMemory", 10*1024*1024);
switch (argc) {
case 1:
mem.enqueue("Hello");
mem.enqueue("World");
std::cout << "Total queued: " << mem.queue_length() << "\n";
break;
case 2:
std::cout << "Found entries: " << mem.queue_length() << "\n";
while (auto msg = mem.try_dequeue()) {
std::cout << "Dequeued " << *msg << "\n";
}
break;
case 3:
std::cout << "Continuous reader\n";
while (true) {
std::cout << "Dequeued " << mem.dequeue() << "\n";
}
break;
}
}
小演示:
总结,注意
请注意,上面的内容有些松散。值得注意的是,Boost Interprocess 中缺少健壮的锁,需要格外小心才能在不持有锁的情况下正确关闭。
我建议也与 ip::message_queue
进行对比:
- How to put file in boost::interprocess::managed_shared_memory?(对比共享内存,message_queue 和纯 TCP 套接字)