boost::interprocess::message_queue 必须由写入它的进程创建?

boost::interprocess::message_queue has to be created by the process which writes to it?

我有两个进程使用相同的代码相互通信,具体取决于 boost 1.58.0 的 boost::interprocess::message_queue。

typedef boost::shared_ptr<boost::interprocess::message_queue> mq_ptr;

mq_ptr m_t2b_queue;
mq_ptr m_b2t_queue;


sink::sink(const char * conn_id, bool is_parent)
{
    const int MAX_MESSAGE_NUMBER = 100000;


    snprintf( m_t2b_queue_name, MAX_MQ_NAME_LEN, "%s_t2b", conn_id);
    snprintf( m_b2t_queue_name, MAX_MQ_NAME_LEN, "%s_b2t", conn_id);

    if( is_parent )
    {
        message_queue::remove("ffqs5hbKgFs_t2b"/*m_t2b_queue_name*/);
        message_queue::remove("ffqs5hbKgFs_b2t"/*m_b2t_queue_name*/);

        permissions perm;
        perm.set_unrestricted();
        m_t2b_queue.reset(new message_queue(create_only, "ffqs5hbKgFs_t2b"/*m_t2b_queue_name*/, MAX_MESSAGE_NUMBER, 24 /*sizeof(mq_item_t)*/, perm));
        m_b2t_queue.reset(new message_queue(create_only, "ffqs5hbKgFs_b2t"/*m_b2t_queue_name*/, MAX_MESSAGE_NUMBER, 24 /*sizeof(mq_item_t)*/, perm));
    }
    else
    {
        m_t2b_queue.reset(new message_queue(open_only, "ffqs5hbKgFs_t2b"/*m_t2b_queue_name*/));
        m_b2t_queue.reset(new message_queue(open_only, "ffqs5hbKgFs_b2t"/*m_b2t_queue_name*/));
        printf( "t2b max msg size = %d\n", m_t2b_queue->get_max_msg_size() );
        printf( "b2t max msg size = %d\n", m_b2t_queue->get_max_msg_size() );
    }    
}

以上代码中的变量已经替换为硬编码值,这样更清晰

父进程创建接收器实例

sink parent( "ffqs5hbKgFs", true);

通过这种方式,我假设父进程在内部创建了 2 message_queue。一个用于读取/另一个用于写入。

然后父进程创建创建接收器实例的子进程,如

sink child( "ffqs5hbKgFs", false);

我想子进程会打开由父进程创建的现有 2 message_queue。并使用其中之一向父进程发送消息。

问题是,子进程中的message_queue打开成功,但是它的max_msg_size为零

t2b max msg size = 0
b2t max msg size = 0

这会在 clid 进程尝试通过 message_queue 发送消息时导致错误。

问题:这是否意味着 message_queue 必须由写入它的进程创建?

在我的场景中,我希望 message_queue 总是由父进程创建,这样当子进程崩溃时,一个新的子进程可以附加到现有的 message_queue 以恢复工作。


原因:终于找到原因了,一个进程是运行 as x86。另一个是 运行 as x64

您需要在队列创建期间同步访问。您实际上是在同步文件系统访问。

在这里您可以看到正在重现的故障:

Compiling On Coliru

int main() {
    std::cout << "Before: " << getpid() << "\n";

    if (int child = fork()) {
        std::cout << "Parent: " << getpid() << "\n";
        sink parent("ffqs5hbKgFs", true);

        int status;
        waitpid(child, &status, 0);
    } else {
        std::cout << "Child: " << getpid() << "\n";
        sink parent("ffqs5hbKgFs", false);
    }
}

版画

terminate called after throwing an instance of 'boost::interprocess::interprocess_exception'
what():  No such file or directory

当然,一个简单的睡眠就可以证明这是一个竞争条件(你期望什么,child 将打开队列,而 parent 只是 remove-ing 他们):

} else {
    std::cout << "Child: " << getpid() << "\n";
    sleep(1); // one second
    sink parent("ffqs5hbKgFs", false);
}

打印,例如:

Before: 3318
Child: 3319
t2b max msg size = 24
b2t max msg size = 24
Parent: 3318

进一步阅读

有关正确的同步解决方案,请参阅 the docs:

As mentioned before, the ability to shared memory between processes through memory mapped files or shared memory objects is not very useful if the access to that memory can't be effectively synchronized. This is the same problem that happens with thread-synchronization mechanisms, where heap memory and global variables are shared between threads, but the access to these resources needs to be synchronized typically through mutex and condition variables. Boost.Threads implements these synchronization utilities between threads inside the same process. Boost.Interprocess implements similar mechanisms to synchronize threads from different processes

完整演示

Compiling On Coliru

#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <mqueue.h>

namespace bip = boost::interprocess;

typedef boost::shared_ptr<boost::interprocess::message_queue> mq_ptr;

struct mq_item_t {
    char data[24];
};

struct sink {
    std::string m_t2b_queue_name;
    std::string m_b2t_queue_name;
    mq_ptr m_t2b_queue;
    mq_ptr m_b2t_queue;

    sink(const char * conn_id, bool is_parent)
        : 
        m_t2b_queue_name(conn_id + std::string("_t2b")),
        m_b2t_queue_name(conn_id + std::string("_b2t"))
    {
        const int MAX_MESSAGE_NUMBER = 100000;

        if( is_parent )
        {
            bip::message_queue::remove(m_t2b_queue_name.c_str());
            bip::message_queue::remove(m_b2t_queue_name.c_str());

            bip::permissions perm;
            perm.set_unrestricted();
            m_t2b_queue.reset(new bip::message_queue(bip::create_only, m_t2b_queue_name.c_str(), MAX_MESSAGE_NUMBER, sizeof(mq_item_t), perm));
            m_b2t_queue.reset(new bip::message_queue(bip::create_only, m_b2t_queue_name.c_str(), MAX_MESSAGE_NUMBER, sizeof(mq_item_t), perm));
        }
        else
        {
            m_t2b_queue.reset(new bip::message_queue(bip::open_only, m_t2b_queue_name.c_str()));
            m_b2t_queue.reset(new bip::message_queue(bip::open_only, m_b2t_queue_name.c_str()));
            std::cout << "t2b max msg size = " << m_t2b_queue->get_max_msg_size() << "\n";
            std::cout << "b2t max msg size = " << m_b2t_queue->get_max_msg_size() << "\n";
        }    
    }
};

#include <sys/types.h>
#include <sys/wait.h>

int main() {
    std::cout << "Before: " << getpid() << "\n";

    if (int child = fork()) {
        std::cout << "Parent: " << getpid() << "\n";
        sink parent("ffqs5hbKgFs", true);

        int status;
        waitpid(child, &status, 0);
    } else {
        std::cout << "Child: " << getpid() << "\n";
        sleep(1); // one second
        sink parent("ffqs5hbKgFs", false);
    }
}