boost::interprocess 如何为工作进程实现简单的线程安全作业队列

boost::interprocess how to implement a simple thread safe job queue for worker processes

我正在尝试创建一个基本系统,用于从进程之间的队列中获取作业,并在 Windows 上增强进程间通信。当工作进程空闲时,它会从共享队列区域中获取一个作业。 代码是从文档中的示例松散复制的。

我有一个 child 进程,它试图从存储在共享内存中的队列中获取作业,作为 Jobs。问题是,一旦 child 尝试读取 elem = q.front();SafeQueue::next() 中队列的前端,它就会崩溃(下面评论)。 child 进程将在队列为空时终止(当它 returns -999)。

我觉得自己做错了什么。我是 Boost IPC 的新手,非常感谢有关如何实现这个简单的工作队列系统的任何指示或建议。

#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/managed_windows_shared_memory.hpp>
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <string>
#include <thread>
#include <iostream>
#include <mutex>
#include <queue>

using namespace boost::interprocess;


class SafeQueue {
    std::queue<int> q;
    std::mutex m;
public:
    SafeQueue() {}

    void push(int elem) {
        m.lock();
        q.push(elem);
        m.unlock();
    }

    void push(std::vector<int> elem) {
        m.lock();
        for (int e : elem) {
            q.push(e);
        }
        m.unlock();
    }

    int next() {
        int elem = -999;
        m.lock();
        if (!q.empty()) {
            elem = q.front(); //crashes here
            q.pop();
        }
        m.unlock();
        return elem;
    }
};


class Jobs
{
public:
    SafeQueue queue;
};

typedef managed_shared_ptr<Jobs, managed_windows_shared_memory>::type my_shared_ptr;

int main(int argc, char* argv[])
{

    if (argc == 1) {  //Parent process
        std::cout << "starting as parent" << std::endl;
        managed_windows_shared_memory segment(create_only, "MySharedMemory", 4096);

        my_shared_ptr sh_ptr = make_managed_shared_ptr(segment.construct<Jobs>("object to share")(), segment);
        sh_ptr->queue.push({1, 2, 3});
        std::string command = "\"" + std::string(argv[0]) + "\"";
        command += " child ";

        std::thread t([](const std::string& command) {
            std::system(command.c_str());
            }, command);
        while (true) {
            
        }
    }
    else {
        std::cout << "starting as child" << std::endl;

        //Open already created shared memory object.
        managed_windows_shared_memory shm(open_only, "MySharedMemory");
        Jobs* shared_job_list = shm.find<Jobs>("object to share").first;
        
        std::vector<int> taken;
        
        while (true) {
            int result;
            if ((result = shared_job_list->queue.next()) != -999) {
                taken.push_back(result);
                std::cout << "took job " << result << std::endl;
                continue;
            }
            break;
        }
        std::string out = "taken jobs: ";
        for (int res : taken) {
            out += ", " + res;
        }
        std::cout << out << std::endl;
        return 0;
    }
    return 0;
}

共享作业的内部数据必须是无指针的才能与多个进程一起工作。但这不是因为它包含 std::queue 。里面的指针不会跨进程工作。