这个 C++ 中的活动对象模式示例有什么问题?

What wrong with this example of Active Object pattern in C++?

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <functional>
#include <any>
#include <condition_variable>

std::vector<std::function<void(std::any)>> functions;

enum FunctionCodes {
    EXIT = -1, NOTIFY = 0, PRINT
};

class Message
{
public:
    int code;
    std::any data;
};

void perform_function(const Message& message) {
    return functions[message.code](message.data);
}

namespace X {

    class MessageQueue
    {
        friend class Thread;

        class Node
        {
        public:
            Node(MessageQueue::Node *next, MessageQueue::Node *prev, const Message& message)
                : next(next), prev(prev), message(message) {}

        private:
            Node* next;
            Node* prev;
            Message message;
            friend class MessageQueue;
        };

        Node *head, *tail;
        std::mutex mutex;
        std::condition_variable cv;

    public:

        MessageQueue(Node *head, Node *tail) : head(head), tail(tail) {}
        MessageQueue() : MessageQueue(nullptr, nullptr) {}

        void push(const Message& message)
        {
            {
                std::lock_guard<std::mutex> lock(mutex);
                auto new_node = new Node(tail, nullptr, message);
                if (!tail) {
                    tail = head = new_node;
                } else {
                    tail->prev = new_node;
                    tail = new_node;
                }
            }
            cv.notify_one();
        }

        Message pop()
        {
            std::unique_lock<std::mutex> lock(mutex);
            cv.wait(lock, [this](){ return (bool)head; });
            auto head_to_delete = head;
            auto message = head->message;
            if (head->prev) {
                head->prev->next = nullptr;
            }
            head = head->prev;
            if (!head) {
                tail = nullptr;
            }
            delete head_to_delete;
            return message;
        }
    };

    class Thread
    {
        std::thread thread;
    public:
        MessageQueue queue;
        Thread() : queue(nullptr, nullptr),
            thread([this]() {
                while (true) {
                    auto message = queue.pop();
                    if (message.code == EXIT) {
                        break;
                    }
                    perform_function(message);
                }
            }) {}
        void join() { thread.join(); }
        ~Thread() { if (thread.joinable()) thread.join(); }
    };
}

X::Thread printer;
X::Thread notifier;

void cout(const std::string& text) {
    static std::mutex cout_mutex;
    std::lock_guard<std::mutex> guard(cout_mutex);
    std::cout << text << '\n';
}

void push_notify(std::any data) {
    notifier.queue.push({NOTIFY, std::move(data)} );
}

void notify(std::any data) {
    printer.queue.push({PRINT, std::move(data)});
}

void print(std::any data) {
    cout(std::any_cast<std::string>(std::move(data)));
}

void init_functions() {
    functions.resize(2);
    functions[NOTIFY] = notify;
    functions[PRINT] = print;
}

void exit_threads() {
    printer.queue.push({EXIT, {}});
    notifier.queue.push({EXIT, {}});
}

void join_threads() {
    printer.join();
    notifier.join();
}

int main()
{
    init_functions();

    int n;
    std::cin >> n;

    while (n--) {
        // FIXME why doesn't it print correctly?..
        std::string text;
        std::cin >> text;
        std::any data = std::move(text);
        push_notify(std::move(data));
    }

    exit_threads();
    join_threads();
}

在这里,我有 3 个线程。主线程、通知线程和打印机线程。这是多余的,但仅用于示例。

在这里,主线程将一条消息推送到通知程序的队列中,告诉它通知打印机进行打印。然后,通知程序通过将消息推送到队列中告诉打印机进行打印来通知打印机。

在这里,如果我输入3 a b c(全部在一行中),输出什么也没有。但是,如果我一个接一个地输入它们,它会打印除最后一个以外的所有内容。

除了最后一个打印所有的情况可能是因为每次输入和下一个输入之间的时间延迟。

为什么会这样?

在正常情况下,您的主线程仅与通知程序通信,然后通知程序与打印机通信。但是对于退出,您的主线程直接与通知程序和打印机通信。

这意味着可以在通知程序将之前的所有消息发送给它之前将 EXIT 消息添加到打印机的队列中。所以给出这些操作:

main sends 'a' to notifier
main sends 'b' to notifier
main sends 'c' to notifier
notifier sends 'a' to printer
notifier sends 'b' to printer
main sends EXIT to notifier
main sends EXIT to printer
notifier sends 'c' to printer

打印机队列看起来像 a, b, EXIT, c。它在打印 c 之前退出。当您在一行中输入所有输入时情况更糟,因为主线程能够在通知程序向其发送任何内容之前将 EXIT 发送到打印机。

解决方案是让 exit_threads 函数只向通知程序发送消息。当通知程序看到 EXIT 消息时,它应该将其转发给打印机并中断。这确保发送到通知程序的所有消息都在 EXIT 之前发送到打印机:

main sends 'a' to notifier
main sends 'b' to notifier
main sends 'c' to notifier
notifier sends 'a' to printer
notifier sends 'b' to printer
main sends EXIT to notifier
notifier sends 'c' to printer
notifier sends EXIT to printer

快速修复:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <functional>
#include <any>
#include <condition_variable>

std::vector<std::function<void(std::any)>> functions;

enum FunctionCodes {
    EXIT = -1, NOTIFY = 0, PRINT
};

class Message
{
public:
    int code;
    std::any data;
};

void perform_function(const Message& message) {
    return functions[message.code](message.data);
}

namespace X {

    class MessageQueue
    {
        friend class Thread;

        class Node
        {
        public:
            Node(MessageQueue::Node *next, MessageQueue::Node *prev, const Message& message)
                : next(next), prev(prev), message(message) {}

        private:
            Node* next;
            Node* prev;
            Message message;
            friend class MessageQueue;
        };

        Node *head, *tail;
        std::mutex mutex;
        std::condition_variable cv;

    public:

        MessageQueue(Node *head, Node *tail) : head(head), tail(tail) {}
        MessageQueue() : MessageQueue(nullptr, nullptr) {}

        void push(const Message& message)
        {
            {
                std::lock_guard<std::mutex> lock(mutex);
                auto new_node = new Node(tail, nullptr, message);
                if (!tail) {
                    tail = head = new_node;
                } else {
                    tail->prev = new_node;
                    tail = new_node;
                }
            }
            cv.notify_one();
        }

        Message pop()
        {
            std::unique_lock<std::mutex> lock(mutex);
            cv.wait(lock, [this](){ return (bool)head; });
            auto head_to_delete = head;
            auto message = head->message;
            if (head->prev) {
                head->prev->next = nullptr;
            }
            head = head->prev;
            if (!head) {
                tail = nullptr;
            }
            delete head_to_delete;
            return message;
        }
    };

    class Thread
    {
        std::thread thread;
    public:
        MessageQueue queue;
        Thread() : queue(nullptr, nullptr),
            thread([this]() {
                while (true) {
                    auto message = queue.pop();
                    if (message.code == EXIT) {
                        break;
                    }
                    perform_function(message);
                }
            }) {}
        void join() { thread.join(); }
        ~Thread() { if (thread.joinable()) thread.join(); }
    };
}

X::Thread printer;
X::Thread notifier;

void cout(const std::string& text) {
    static std::mutex cout_mutex;
    std::lock_guard<std::mutex> guard(cout_mutex);
    std::cout << text << '\n';
}

void push_notify(std::any data) {
    notifier.queue.push({NOTIFY, std::move(data)} );
}

void notify(std::any data) {
    printer.queue.push({PRINT, std::move(data)});
}

void print(std::any data) {
    cout(std::any_cast<std::string>(std::move(data)));
}

void init_functions() {
    functions.resize(2);
    functions[NOTIFY] = notify;
    functions[PRINT] = print;
}

void exit_threads() {
    notifier.queue.push({EXIT, {}});
    notifier.join();
    printer.queue.push({EXIT, {}});
    printer.join();
}

int main()
{
    init_functions();

    int n;
    std::cin >> n;

    while (n--) {
        std::string text;
        std::cin >> text;
        std::any data = std::move(text);
        push_notify(std::move(data));
    }

    exit_threads();
}

这里的区别是我先退出notifier并加入它,确保没有更多的消息要发送,然后退出并加入打印机。

此外,可能会使用 std::queue

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <functional>
#include <any>
#include <condition_variable>
#include <queue>

std::vector<std::function<void(std::any)>> functions;

enum FunctionCodes {
    EXIT = -1, NOTIFY = 0, PRINT
};

class Message
{
public:
    int code;
    std::any data;
};

void perform_function(const Message& message) {
    return functions[message.code](message.data);
}

namespace X {
    
    class Thread
    {
        std::queue<Message> message_queue;
        std::condition_variable queue_cv;
        std::mutex queue_mutex;
        std::thread thread;

        Message pop()
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            queue_cv.wait(lock, [this](){ return !message_queue.empty(); });
            Message message = message_queue.front();
            message_queue.pop();
            return message;
        }

    public:
        Thread() :
            thread([this]() {
                while (true) {
                    auto message = pop();
                    if (message.code == EXIT) {
                        break;
                    }
                    perform_function(message);
                }
            }) {}
        void join() { thread.join(); }
        ~Thread() { if (thread.joinable()) thread.join(); }

        void push(Message message)
        {
            {
                std::lock_guard<std::mutex> lock(queue_mutex);
                message_queue.push(std::move(message));
            }
            queue_cv.notify_one();
        }
    };
}

X::Thread printer;
X::Thread notifier;

void cout(const std::string& text) {
    static std::mutex cout_mutex;
    std::lock_guard<std::mutex> guard(cout_mutex);
    std::cout << text << '\n';
}

void push_notify(std::any data) {
    notifier.push({NOTIFY, std::move(data)} );
}

void notify(std::any data) {
    printer.push({PRINT, std::move(data)});
}

void print(std::any data) {
    cout(std::any_cast<std::string>(std::move(data)));
}

void init_functions() {
    functions.resize(2);
    functions[NOTIFY] = notify;
    functions[PRINT] = print;
}

void exit_threads() {
    notifier.push({EXIT, {}});
    notifier.join();
    printer.push({EXIT, {}});
    printer.join();
}

int main()
{
    init_functions();

    int n;
    std::cin >> n;

    while (n--) {
        std::string text;
        std::cin >> text;
        std::any data = std::move(text);
        push_notify(std::move(data));
    }

    exit_threads();
}