Luca Luca - 2 months ago 10
C++ Question

Thread safe array of buffers

I am currently refactoring some code I found in the nvidia hardware encoder for compressing video images. The original question is here: wondering if I can use stl smart pointers for this

Based on the answers, I have updated my code as follows:

Based on the answers and the comments, I have tried to make a thread-safe buffer array. Here it is. Please comment.

#ifndef __BUFFER_ARRAY_H__
#define __BUFFER_ARRAY_H__

#include <vector>
#include <mutex>
#include <thread>

template<class T>
class BufferArray
{
public:
class BufferArray()
:num_pending_items(0), pending_index(0), available_index(0)
{}

// This method is not thread-safe.
// Add an item to our buffer list
// Note we do not take ownership of the incoming pointer.
void add(T * buffer)
{
buffer_array.push_back(buffer);
}

// Returns a naked pointer to an available buffer. Should not be
// deleted by the caller.
T * get_available()
{
std::lock_guard<std::mutex> lock(buffer_array_mutex);
if (num_pending_items == buffer_array.size()) {
return NULL;
}
T * buffer = buffer_array[available_index];
// Update the indexes.
available_index = (available_index + 1) % buffer_array.size();
num_pending_items += 1;
return buffer;
}

T * get_pending()
{
std::lock_guard<std::mutex> lock(buffer_array_mutex);
if (num_pending_items == 0) {
return NULL;
}

T * buffer = buffer_array[pending_index];
pending_index = (pending_index + 1) % buffer_array.size();
num_pending_items -= 1;
return buffer;
}


private:
std::vector<T * > buffer_array;
std::mutex buffer_array_mutex;
unsigned int num_pending_items;
unsigned int pending_index;
unsigned int available_index;

// No copy semantics
BufferArray(const BufferArray &) = delete;
void operator=(const BufferArray &) = delete;
};

#endif


My question is whether I am breaking some C++ good practice recommendations here? Also, I am expending the class so that it can be accessed and used my multiple threads. I was wondering if there is anything that I might have missed.

Answer

I think I'd approach it something like this:

In this test, the "processing" is just multiplying an int by 2. But notice how the processor thread takes pending data off a pending queue, processes it, then pushes available data to the available queue. Then it signals (via the condition variable) that the consumer (in this case, your disk-writer) should look again for available data.

#include <vector>
#include <mutex>
#include <thread>
#include <queue>
#include <condition_variable>
#include <iostream>

namespace notstd {
    template<class Mutex> auto getlock(Mutex& m)
    {
        return std::unique_lock<Mutex>(m);
    }
}

template<class T>
class ProcessQueue
{
public:
    ProcessQueue()
    {}

    // This method is not thread-safe.
    // Add an item to our buffer list
    // Note we do not take ownership of the incoming pointer.
    // @pre start_processing shall not have been called
    void add(T * buffer)
    {
        pending_.push(buffer);
    }

    void start_processing()
    {
        process_thread_ = std::thread([this] {
            while(not this->pending_.empty())
            {
                auto lock = notstd::getlock(this->mutex_);
                auto buf = this->pending_.front();
                lock.unlock();

                //
                // this is the part that processes the "buffer"

                *buf *= 2;

                //
                // now notify the structure that the processing is done - buffer is available
                //

                lock.lock();
                this->pending_.pop();
                this->available_.push(buf);
                lock.unlock();
                this->change_.notify_one();
            }
        });
    }

    T* wait_available()
    {
        auto lock = notstd::getlock(mutex_);
        change_.wait(lock, [this] { return not this->available_.empty() or this->pending_.empty(); });
        if (not available_.empty())
        {
            auto p = available_.front();
            available_.pop();
            return p;
        }

        lock.unlock();
        process_thread_.join();
        return nullptr;
    }

private:
    std::queue<T * >                   pending_;
    std::queue<T * >                   available_;
    std::mutex                          mutex_;
    std::condition_variable             change_;
    std::thread                     process_thread_;

    // No copy semantics - implicit because of the mutex
};

int main()
{
    ProcessQueue<int> pq;

    std::vector<int> v = { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
    for (auto& i : v) {
        pq.add(std::addressof(i));
    }

    pq.start_processing();

    while (auto p = pq.wait_available())
    {
        std::cout << *p << '\n';
    }
}

expected output:

2
4
6
8
10
12
14
16
18