OzBarry OzBarry - 3 months ago 9
C++ Question

Implementing an atomic queue in QT5

I am building a rather complicated application in Qt that can dynamically load dynamic libraries and run them as threads, and they must pass information between each other as quickly as possible, so I figured an atomic queue would be my best case scenario, so this is the AtomicQueue.hpp file:

#ifndef ATOMICQUEUE_HPP
#define ATOMICQUEUE_HPP

#include <QAtomicPointer>


// Used http://www.drdobbs.com/parallel/writing-lock-free-code-a-corrected-queue/210604448?pgno=2
// as reference
template<class T>
class AtomicQueue
{
struct QueueNode
{
QueueNode( const T& value ) : next( NULL ), data( value ) {}
~QueueNode() { if ( next ) delete next; }
QueueNode *next;
T data;
};

public:
AtomicQueue()
{
m_front = new QueueNode( T() );
m_tail.store( m_front );
m_divider.store( m_front );
}

~AtomicQueue() {}

void push( const T& value )
{
m_tail.load()->next = new QueueNode( value );
m_tail = m_tail.load()->next; // This moves the QueueNode into the atomic pointer, making it safe :)
while( m_front != m_divider.load() )
{
QueueNode *tmp = m_front;
m_front = m_front->next;
delete tmp;
}
}

bool peek( T& result )
{
if ( m_divider.load() != m_tail.load() )
{
// Problem area
QueueNode *next = m_divider.load()->next;
if ( next )
{
result = next->data;
return true;
}
}
return false;
}

bool pop( T& result )
{
bool res = this->peek( result );
if ( res )
{
m_divider = m_divider.load()->next;
}
return res;
}

private:
QueueNode *m_front;
QAtomicPointer<QueueNode> m_divider, m_tail;
};

#endif // ATOMICQUEUE_HPP


The queue breaks after I push and pop one item, and I can't figure out why. I'm quite new to Atomic style thread-safe programming, so it's quite possible that I just don't understand a specific aspect of this. When running in debug mode, I get a SEGSEV segfault in my
bool AtomicQueue::peek
function when I do
result = next->data
.

Can anyone point out what I'm doing wrong?

Update:



So I fixed the problem, which was in my QueueNode destructor. Essentially when I deleted a node, it would attempt to clean up all further nodes, which then gave me a few different routes for fixing it:


  1. Set a flag in QueueNode that tells it not to delete all the
    ->next
    nodes

  2. Detach the front node before deleting it

  3. Allow the AtomicQueue to manage cleanup of nodes



I opted for the third option since it already did some cleanup while pushing new nodes, so here is the fixed class for anyone interested:

#ifndef ATOMICQUEUE_HPP
#define ATOMICQUEUE_HPP

#include <QAtomicPointer>


// Used http://www.drdobbs.com/parallel/writing-lock-free-code-a-corrected-queue/210604448?pgno=2
// as reference
template<class T>
class AtomicQueue
{
struct QueueNode
{
QueueNode( const T& value ) : next( NULL ), data( value ) {}
~QueueNode() { /*if ( next ) delete next;*/ }
QueueNode *next;
T data;
};

public:
AtomicQueue()
{
m_front = new QueueNode( T() );
m_tail.store( m_front );
m_divider.store( m_front );
}

~AtomicQueue()
{
QueueNode *node = m_front;
while( node->next )
{
QueueNode *n = node->next;
delete node;
node = n;
}
}

void push( const T& value )
{
m_tail.load()->next = new QueueNode( value );
m_tail = m_tail.load()->next; // This moves the QueueNode into the atomic pointer, making it safe :)
while( m_front != m_divider.load() )
{
QueueNode *tmp = m_front;
m_front = m_front->next;
delete tmp;
}
}

bool peek( T& result )
{
if ( m_divider.load() != m_tail.load() )
{
// Problem area
QueueNode *next = m_divider.load()->next;
if ( next )
{
result = next->data;
return true;
}
}
return false;
}

bool pop( T& result )
{
bool res = this->peek( result );
if ( res )
{
m_divider = m_divider.load()->next;
}
return res;
}

private:
QueueNode *m_front;
QAtomicPointer<QueueNode> m_divider, m_tail;
};

#endif // ATOMICQUEUE_HPP

Answer

Note: This is just copied from my question once I figured out the issue in my code. Thanks @peter-k for pointing out this didn't officially get answered.

So I fixed the problem, which was in my QueueNode destructor. Essentially when I deleted a node, it would attempt to clean up all further nodes, which then gave me a few different routes for fixing it:

  1. Set a flag in QueueNode that tells it not to delete all the ->next nodes
  2. Detach the front node before deleting it
  3. Allow the AtomicQueue to manage cleanup of nodes

I opted for the third option since it already did some cleanup while pushing new nodes, so here is the fixed class for anyone interested:

#ifndef ATOMICQUEUE_HPP
#define ATOMICQUEUE_HPP

#include <QAtomicPointer>


// Used http://www.drdobbs.com/parallel/writing-lock-free-code-a-corrected-queue/210604448?pgno=2
// as reference
template<class T>
class AtomicQueue
{
    struct QueueNode
    {
        QueueNode( const T& value ) : next( NULL ), data( value ) {}
        ~QueueNode() { /*if ( next ) delete next;*/ }
        QueueNode   *next;
        T           data;
    };

public:
    AtomicQueue()
    {
        m_front = new QueueNode( T() );
        m_tail.store( m_front );
        m_divider.store( m_front );
    }

    ~AtomicQueue()
    {
        QueueNode *node = m_front;
        while( node->next )
        {
            QueueNode *n = node->next;
            delete node;
            node = n;
        }
    }

    void push( const T& value )
    {
        m_tail.load()->next = new QueueNode( value );
        m_tail = m_tail.load()->next; // This moves the QueueNode into the atomic pointer, making it safe :)
        while( m_front != m_divider.load() )
        {
            QueueNode *tmp = m_front;
            m_front = m_front->next;
            delete tmp;
        }
    }

    bool peek( T& result )
    {
        if ( m_divider.load() != m_tail.load() )
        {
            // Problem area
            QueueNode *next = m_divider.load()->next;
            if ( next )
            {
                result = next->data;
                return true;
            }
        }
        return false;
    }

    bool pop( T& result )
    {
        bool res = this->peek( result );
        if ( res )
        {
            m_divider = m_divider.load()->next;
        }
        return res;
    }

private:
    QueueNode                   *m_front;
    QAtomicPointer<QueueNode>   m_divider, m_tail;
};

#endif // ATOMICQUEUE_HPP
Comments