有效地添加到排序队列

Efficiently add to a sorted queue

我以前有一个传入消息的双端队列,这些消息来自客户端消息,在单个线程上顺序处理。

//defined and updated elsewhere
//std::deque<Message> messages;
//typedef std::deque<Message>::iterator MessageIterator
MessageIterator result = std::remove_if(processMessages, messages.begin(), messages.end());
if(result != messages.end()) messages.erase(result,messages.end());

问题是,如果我从一个客户端收到大量消息,其他客户端似乎对用户没有响应,因为它们的消息正在等待处理。我对此的解决方案是将消息存储为 deques 的映射,并从每个消息中取出第一个并进行处理。

//typedef std::map<std::string, std::deque<Message> > MessageMapIterator
//std::map<std::string, std::deque<Message> > MessageMap
unsigned int emptyCount;
do{
    emptyCount = MessageMap.size();
    for(MessageMapIterator it = MessageMap.begin(); it != MessageMap.end(); ++it){
        if(!it->second.empty()){
            Message tmpMessage = it->second.front();
            it->second.pop_front();
            if(!ProcessMessage(tmpMessage)){
                //I need to keep some of the messages.
                unfinishedMessages.push_back(tmpMessage);
            }
        }else{
            emptyCount--;
        }
    }
}while(emptyCount > 0);

此解决方案的问题在于,它似乎比在单个双端队列上执行操作要慢得多。我想知道在处理消息之前将消息组合成一个排序的单个双端队列是否值得,如果是这样,是否有一种有效的算法可以将新消息添加到该队列。

例如,假设我有一个队列,其中包含来自以下编号客户端的消息。 1,2,3,1,2,2,2

我从 1 收到一条新消息,该消息应该放在位置 5。但是,如果我从 2 收到一条消息,它应该放在最后,如果我从 3 收到一条消息,它应该放在位置 5。或者我可以从一个新客户 (4) 那里得到一条消息,他的消息应该放在位置 4。

貌似实现Message对象之间的比较是可行的,但是比较的结果取决于队列中的当前元素,如果我需要根据队列计算信息,则效率不高,每个我想插入的时间。

在我看来,您似乎想要循环处理每个客户端的消息。

您可以使用现有的 map/deque 组合来执行此操作,方法是跟踪您弹出的最后一个客户端 ID,并使用 upper_bound 成员函数在每次弹出下一个客户端时获取下一个客户端。

但是使用多地图这样做更容易(并且希望更有效),因为您不需要维护两个容器。

#include <map>
#include <iostream>
#include <exception>

using client_id = int;
using message = int;

class MessageQueue
{
    private:
        std::multimap<client_id, message> messages;
        client_id lastId = {};

    public:
        using message_value = std::pair<client_id, message>;

        void push(client_id id, message m)
        {
            messages.insert( std::make_pair(id, m) );
        }

        message_value pop()
        {
            // Get first message of the next highest client id
            auto nextMessage = messages.upper_bound(lastId);
            if (nextMessage == messages.end())
            {
                // Nothing higher, so start from the beginning again
                nextMessage = messages.begin();
                if (nextMessage == messages.end())
                {
                    // Nothing left on the queue
                    throw std::out_of_range("No more messages");
                }
            }
            message_value result = *nextMessage;
            messages.erase(nextMessage);
            lastId = result.first;
            return result;
        }

        size_t empty() const { return messages.empty(); }
        size_t size() const { return messages.size(); }
};

int main(int argc, char* argv[])
{
    MessageQueue mq;

    auto order = 0;
    // set initial content to 1,2,3,1,2,2,2
    mq.push(1, ++order);
    mq.push(2, ++order);
    mq.push(3, ++order);
    mq.push(1, ++order);
    mq.push(2, ++order);
    mq.push(2, ++order);
    mq.push(2, ++order);

    // Now also push on one of each client as per question
    mq.push(1, ++order);
    mq.push(2, ++order);
    mq.push(3, ++order);
    mq.push(4, ++order);

    // Process all messages
    while (!mq.empty())
    {
        auto m = mq.pop();
        std::cout << "id = " << m.first << ", message = " << m.second << std::endl;
    }
    std::cout << "Done" << std::endl;
}

替代 class 将 upper_bound 方法替换为基于手动迭代器的方法:

class MessageQueue
{
    private:
        using container = std::multimap<client_id, message>;

        container messages;
        container::iterator lastIter;
        client_id lastId = {};

    public:
        using message_value = container::value_type;

        MessageQueue() : lastIter(messages.end()) {}

        void push(client_id id, message m)
        {
            messages.insert( std::make_pair(id, m) );
        }

        message_value pop()
        {
            auto end = messages.end();
            while (lastIter != end && lastIter->first == lastId)
            {
                ++lastIter;
            }
            if (lastIter == end)
            {
                lastIter = messages.begin();
            }
            if (lastIter == end)
            {
                // Nothing left on the queue
                throw std::out_of_range("No more messages");
            }
            auto currIter = lastIter;

            ++lastIter;
            lastId = currIter->first;

            message_value result = *currIter;
            messages.erase(currIter);
            return result;
        }

        size_t empty() const { return messages.empty(); }
        size_t size() const { return messages.size(); }
};