boost::interprocess 如何为工作进程实现简单的线程安全作业队列
boost::interprocess how to implement a simple thread safe job queue for worker processes
我正在尝试创建一个基本系统,用于从进程之间的队列中获取作业,并在 Windows 上增强进程间通信。当工作进程空闲时,它会从共享队列区域中获取一个作业。
代码是从文档中的示例松散复制的。
我有一个 child 进程,它试图从存储在共享内存中的队列中获取作业,作为 Jobs
。问题是,一旦 child 尝试读取 elem = q.front();
中 SafeQueue::next()
中队列的前端,它就会崩溃(下面评论)。 child 进程将在队列为空时终止(当它 returns -999)。
我觉得自己做错了什么。我是 Boost IPC 的新手,非常感谢有关如何实现这个简单的工作队列系统的任何指示或建议。
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/managed_windows_shared_memory.hpp>
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <string>
#include <thread>
#include <iostream>
#include <mutex>
#include <queue>
using namespace boost::interprocess;
class SafeQueue {
std::queue<int> q;
std::mutex m;
public:
SafeQueue() {}
void push(int elem) {
m.lock();
q.push(elem);
m.unlock();
}
void push(std::vector<int> elem) {
m.lock();
for (int e : elem) {
q.push(e);
}
m.unlock();
}
int next() {
int elem = -999;
m.lock();
if (!q.empty()) {
elem = q.front(); //crashes here
q.pop();
}
m.unlock();
return elem;
}
};
class Jobs
{
public:
SafeQueue queue;
};
typedef managed_shared_ptr<Jobs, managed_windows_shared_memory>::type my_shared_ptr;
int main(int argc, char* argv[])
{
if (argc == 1) { //Parent process
std::cout << "starting as parent" << std::endl;
managed_windows_shared_memory segment(create_only, "MySharedMemory", 4096);
my_shared_ptr sh_ptr = make_managed_shared_ptr(segment.construct<Jobs>("object to share")(), segment);
sh_ptr->queue.push({1, 2, 3});
std::string command = "\"" + std::string(argv[0]) + "\"";
command += " child ";
std::thread t([](const std::string& command) {
std::system(command.c_str());
}, command);
while (true) {
}
}
else {
std::cout << "starting as child" << std::endl;
//Open already created shared memory object.
managed_windows_shared_memory shm(open_only, "MySharedMemory");
Jobs* shared_job_list = shm.find<Jobs>("object to share").first;
std::vector<int> taken;
while (true) {
int result;
if ((result = shared_job_list->queue.next()) != -999) {
taken.push_back(result);
std::cout << "took job " << result << std::endl;
continue;
}
break;
}
std::string out = "taken jobs: ";
for (int res : taken) {
out += ", " + res;
}
std::cout << out << std::endl;
return 0;
}
return 0;
}
共享作业的内部数据必须是无指针的才能与多个进程一起工作。但这不是因为它包含 std::queue 。里面的指针不会跨进程工作。
我正在尝试创建一个基本系统,用于从进程之间的队列中获取作业,并在 Windows 上增强进程间通信。当工作进程空闲时,它会从共享队列区域中获取一个作业。 代码是从文档中的示例松散复制的。
我有一个 child 进程,它试图从存储在共享内存中的队列中获取作业,作为 Jobs
。问题是,一旦 child 尝试读取 elem = q.front();
中 SafeQueue::next()
中队列的前端,它就会崩溃(下面评论)。 child 进程将在队列为空时终止(当它 returns -999)。
我觉得自己做错了什么。我是 Boost IPC 的新手,非常感谢有关如何实现这个简单的工作队列系统的任何指示或建议。
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/managed_windows_shared_memory.hpp>
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <string>
#include <thread>
#include <iostream>
#include <mutex>
#include <queue>
using namespace boost::interprocess;
class SafeQueue {
std::queue<int> q;
std::mutex m;
public:
SafeQueue() {}
void push(int elem) {
m.lock();
q.push(elem);
m.unlock();
}
void push(std::vector<int> elem) {
m.lock();
for (int e : elem) {
q.push(e);
}
m.unlock();
}
int next() {
int elem = -999;
m.lock();
if (!q.empty()) {
elem = q.front(); //crashes here
q.pop();
}
m.unlock();
return elem;
}
};
class Jobs
{
public:
SafeQueue queue;
};
typedef managed_shared_ptr<Jobs, managed_windows_shared_memory>::type my_shared_ptr;
int main(int argc, char* argv[])
{
if (argc == 1) { //Parent process
std::cout << "starting as parent" << std::endl;
managed_windows_shared_memory segment(create_only, "MySharedMemory", 4096);
my_shared_ptr sh_ptr = make_managed_shared_ptr(segment.construct<Jobs>("object to share")(), segment);
sh_ptr->queue.push({1, 2, 3});
std::string command = "\"" + std::string(argv[0]) + "\"";
command += " child ";
std::thread t([](const std::string& command) {
std::system(command.c_str());
}, command);
while (true) {
}
}
else {
std::cout << "starting as child" << std::endl;
//Open already created shared memory object.
managed_windows_shared_memory shm(open_only, "MySharedMemory");
Jobs* shared_job_list = shm.find<Jobs>("object to share").first;
std::vector<int> taken;
while (true) {
int result;
if ((result = shared_job_list->queue.next()) != -999) {
taken.push_back(result);
std::cout << "took job " << result << std::endl;
continue;
}
break;
}
std::string out = "taken jobs: ";
for (int res : taken) {
out += ", " + res;
}
std::cout << out << std::endl;
return 0;
}
return 0;
}
共享作业的内部数据必须是无指针的才能与多个进程一起工作。但这不是因为它包含 std::queue 。里面的指针不会跨进程工作。