user3732255 user3732255 - 9 days ago 4
C++ Question

Efficiently add to a sorted queue

I previously had a deque of incoming messages, which come from client messages, which are processed on a single thread sequentially.

//defined and updated elsewhere
//std::deque<Message> messages;
//typedef std::deque<Message>::iterator MessageIterator
MessageIterator result = std::remove_if(processMessages, messages.begin(), messages.end());
if(result != messages.end()) messages.erase(result,messages.end());


The problem is that if I receive lots of messages from a single client, the other clients appear to be unresponsive to the user, as their messages are waiting to be processed. My solution for this was to store the messages as map of deques and take the first from each and process that.

//typedef std::map<std::string, std::deque<Message> > MessageMapIterator
//std::map<std::string, std::deque<Message> > MessageMap
unsigned int emptyCount;
do{
emptyCount = MessageMap.size();
for(MessageMapIterator it = MessageMap.begin(); it != MessageMap.end(); ++it){
if(!it->second.empty()){
Message tmpMessage = it->second.front();
it->second.pop_front();
if(!ProcessMessage(tmpMessage)){
//I need to keep some of the messages.
unfinishedMessages.push_back(tmpMessage);
}
}else{
emptyCount--;
}
}
}while(emptyCount > 0);


The problem with this solution is that it appears to be noticeably slower than performing the operation on the single deque. I was wondering if it would be worth it to combine the messages into a sorted single deque before processing the messages on it, and if so, is there an efficient algorithm to add a new message to this queue.

For example say I have a queue with messages from the following numbered clients.
1,2,3,1,2,2,2

And I get a new message from 1, that message should be put into position 5. However, if I get a message from 2 it should be placed at the end, and if I get a message from 3, it should be placed in position 5. Or I could get a message from a new client (4) whose message should be put into position 4.

It seems like implementing a comparison between Message objects would work, but the result of the comparison is dependent on the current elements in the queue, and would not be efficient if I needed to calculate information based on the queue, every time I wanted to insert into it.

Answer

It looks to me as if you want to process messages per client on a round-robin basis.

You could do this using your existing map/deque combo, by tracking the last client id you popped and using the upper_bound member function to get the next client each time you pop the next.

But this is a little easier to do (and also hopefully more efficient) with a multimap, as you don't need to maintain two containers.

#include <map>
#include <iostream>
#include <exception>

using client_id = int;
using message = int;

class MessageQueue
{
    private:
        std::multimap<client_id, message> messages;
        client_id lastId = {};

    public:
        using message_value = std::pair<client_id, message>;

        void push(client_id id, message m)
        {
            messages.insert( std::make_pair(id, m) );
        }

        message_value pop()
        {
            // Get first message of the next highest client id
            auto nextMessage = messages.upper_bound(lastId);
            if (nextMessage == messages.end())
            {
                // Nothing higher, so start from the beginning again
                nextMessage = messages.begin();
                if (nextMessage == messages.end())
                {
                    // Nothing left on the queue
                    throw std::out_of_range("No more messages");
                }
            }
            message_value result = *nextMessage;
            messages.erase(nextMessage);
            lastId = result.first;
            return result;
        }

        size_t empty() const { return messages.empty(); }
        size_t size() const { return messages.size(); }
};

int main(int argc, char* argv[])
{
    MessageQueue mq;

    auto order = 0;
    // set initial content to 1,2,3,1,2,2,2
    mq.push(1, ++order);
    mq.push(2, ++order);
    mq.push(3, ++order);
    mq.push(1, ++order);
    mq.push(2, ++order);
    mq.push(2, ++order);
    mq.push(2, ++order);

    // Now also push on one of each client as per question
    mq.push(1, ++order);
    mq.push(2, ++order);
    mq.push(3, ++order);
    mq.push(4, ++order);

    // Process all messages
    while (!mq.empty())
    {
        auto m = mq.pop();
        std::cout << "id = " << m.first << ", message = " << m.second << std::endl;
    }
    std::cout << "Done" << std::endl;
}

Alternate class to replace the upper_bound method with manual iterator-based method:

class MessageQueue
{
    private:
        using container = std::multimap<client_id, message>;

        container messages;
        container::iterator lastIter;
        client_id lastId = {};

    public:
        using message_value = container::value_type;

        MessageQueue() : lastIter(messages.end()) {}

        void push(client_id id, message m)
        {
            messages.insert( std::make_pair(id, m) );
        }

        message_value pop()
        {
            auto end = messages.end();
            while (lastIter != end && lastIter->first == lastId)
            {
                ++lastIter;
            }
            if (lastIter == end)
            {
                lastIter = messages.begin();
            }
            if (lastIter == end)
            {
                // Nothing left on the queue
                throw std::out_of_range("No more messages");
            }
            auto currIter = lastIter;

            ++lastIter;
            lastId = currIter->first;

            message_value result = *currIter;
            messages.erase(currIter);
            return result;
        }

        size_t empty() const { return messages.empty(); }
        size_t size() const { return messages.size(); }
};