CPP:这是处理线程安全 FIFIO 消息的合理方式吗 queues

CPP: Is this a reasonable way to go about thread safe FIFIO message queues

请保持温柔 - 我第一次在这里发帖。目前正在自学 C++。想玩线程并在它们之间传递任何类型的消息。尚未进入线程部分 - 仅处理消息传递 queue 元素。下面是我放在一起的有效代码 - 至少在单个 main() 线程中。在我开始创建自定义数据 objects 和线程函数之前 - 我想就我目前所做的是否可行(如果不完全符合 convention/standards 或完全有效)达成共识 - 所以真的只是寻找方向,如果这会在以后使用线程产生不必要的复杂性。

所以两个代码文件:

messageQueue.h 其中包括实现以及 header 因为无法让链接器与单独的 cpp 文件中的实现一起工作.

#pragma once
#include<queue>         // for queue
#include<thread>        // for smart pointers
#include<mutex>         // for locking of the queue when doing push(), front() and pop()



/*
Purpose:
To provide a thread safe method of of passing messages between threads in a FIFO queue manner.
Currently this uses unique_ptr which enables a one to one publisher/consumer application, if
we need one to many publisher/consumer model then will need to investigate maintaining a collection 
of subscribers (consumers) and use shared_ptr.
Usage notes:
Message objects need to be created using make_unique not using new - e.g.
    auto messagePtr = std::make_unique<message_t<std::string>>("this is a message");
Call this way: 
    messageQueue<objectType> messageQueueName;
    messageQueueName.publish(std::move(messagePtr));
    messagePtr = messageQueueName.consume();
*/

template<class T>
class message_t
{
public:
    message_t(T message) : m_message{ message } {}

private:
    T m_message;
};

template<class T>
class messageQueue
{
public:
    messageQueue() {};
    void publish(std::unique_ptr<message_t<T>> messagePtr);
    std::unique_ptr<message_t<T>> consume();
    bool hasData();
private:
    std::queue < std::unique_ptr<message_t<T>>> m_queue;
    std::mutex m_mutex;

};

// Had to add the implementation into the header file as templated classes have link issues when in separate cpp files :-(

template<class T>
void messageQueue<T>::publish(std::unique_ptr<message_t<T>> messagePtr) {
    std::lock_guard<std::mutex> lock(m_mutex); 
    m_queue.push(std::move(messagePtr));
};

template<class T>
std::unique_ptr<message_t<T>> messageQueue<T>::consume() {
    std::unique_ptr<message_t<T>> retVal;
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        if (!m_queue.empty()) { retVal = std::move(m_queue.front()); }
        m_queue.pop();
    }

    return retVal;
};

template<class T>
bool messageQueue<T>::hasData() {
    std::lock_guard<std::mutex> lock(m_mutex);
    return(!m_queue.empty());
};

main.cpp - 里面只有一个非常简单的测试用例。

#include "messageQueue.h"
#include<string>
#include<iostream>


int main(void) {
    messageQueue<std::string> textQueue;                        // Create the message queue 
    auto messagePtr = std::make_unique<message_t<std::string>>("this is a message");    // Create a message and get its unique pointer
    std::cout << "messagePtr= " << messagePtr << std::endl;     // output the pointer value
    textQueue.publish(std::move(messagePtr));                   // Push the message pointer onto the queue : messagePtr is now 0
    auto newMessagePtr = std::make_unique<message_t<std::string>>("this is another message");   // Create a second message and get its unique pointer
    std::cout << "newMessagePtr= " << newMessagePtr << std::endl;   // output the new message pointer value
    std::cout << "Consuming messagePtr into newMessagePtr" << std::endl;    //comment
    if (textQueue.hasData()) { newMessagePtr = textQueue.consume(); }   // Pull the original pointer off the queue and assign to new message pointer
    std::cout << "newMessagePtr= " << newMessagePtr << std::endl;   // output the pointer in the new message pointer - should be first pointer value


    return(0);
}

我看到的一个问题是,如果消息队列为空,您仍然会从中弹出。

还有一个概念性问题:在多线程环境中 hasData() 永远没有意义。不管它 return 是什么,它都可能立即被另一个线程无效。因此,除非您接受其正确性的不确定性,否则任何代码都不能真正依赖于 return 值。

更常见的方法是无条件地 try_consume 值(如果没有消息则 return nullptr)——你基本上已经实现为 consume.

另一种方法是使用阻塞 consume,它会暂停当前线程,直到其他线程写入队列。然而,这需要使用信令,例如通过 std::condition_variable 或原子。这有其缺陷,但如果您正在学习,花时间了解它的工作原理是值得的。