Syntactic Fructose Syntactic Fructose - 2 months ago 12
C++ Question

Efficiently waiting for all tasks in a threadpool to finish

I currently have a program with x workers in my threadpool. During the main loop y tasks are assigned to the workers to complete, but after the tasks are sent out I must wait for all tasks for finish before preceding with the program. I believe my current solution is inefficient, there must be a better way to wait for all tasks to finish but I am not sure how to go about this

// called in main after all tasks are enqueued to
// std::deque<std::function<void()>> tasks
void ThreadPool::waitFinished()
{
while(!tasks.empty()) //check if there are any tasks in queue waiting to be picked up
{
//do literally nothing
}
}





More information:

threadpool structure

//worker thread objects
class Worker {
public:
Worker(ThreadPool& s): pool(s) {}
void operator()();
private:
ThreadPool &pool;
};

//thread pool
class ThreadPool {
public:
ThreadPool(size_t);
template<class F>
void enqueue(F f);
void waitFinished();
~ThreadPool();
private:
friend class Worker;
//keeps track of threads so we can join
std::vector< std::thread > workers;
//task queue
std::deque< std::function<void()> > tasks;
//sync
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};


or here's a gist of my threadpool.hpp

example of what I want to use
waitFinished()
for:

while(running)
//....
for all particles alive
push particle position function to threadpool
end for

threadPool.waitFinished();

push new particle position data into openGL buffer
end while


so this way I can send hundrends of thousands of particle position tasks to be done in parallel, wait for them to finish and put the new data inside the openGL position buffers

Answer

This is one way to do what you're trying. Using two condition variables on the same mutex is not for the light-hearted unless you know what is going on internally. I didn't need the atomic processed member other than my desire to demonstrate how many items were finished between each run.

The sample workload function in this generates one million random int values, then sorts them (gotta heat my office one way or another). waitFinished will not return until the queue is empty and no threads are busy.

#include <iostream>
#include <deque>
#include <functional>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <random>

//thread pool
class ThreadPool
{
public:
    ThreadPool(unsigned int n = std::thread::hardware_concurrency());

    template<class F> void enqueue(F&& f);
    void waitFinished();
    ~ThreadPool();

    unsigned int getProcessed() const { return processed; }

private:
    std::vector< std::thread > workers;
    std::deque< std::function<void()> > tasks;
    std::mutex queue_mutex;
    std::condition_variable cv_task;
    std::condition_variable cv_finished;
    std::atomic_uint processed;
    unsigned int busy;
    bool stop;

    void thread_proc();
};

ThreadPool::ThreadPool(unsigned int n)
    : busy()
    , processed()
    , stop()
{
    for (unsigned int i=0; i<n; ++i)
        workers.emplace_back(std::bind(&ThreadPool::thread_proc, this));
}

ThreadPool::~ThreadPool()
{
    // set stop-condition
    std::unique_lock<std::mutex> latch(queue_mutex);
    stop = true;
    cv_task.notify_all();
    latch.unlock();

    // all threads terminate, then we're done.
    for (auto& t : workers)
        t.join();
}

void ThreadPool::thread_proc()
{
    while (true)
    {
        std::unique_lock<std::mutex> latch(queue_mutex);
        cv_task.wait(latch, [this](){ return stop || !tasks.empty(); });
        if (!tasks.empty())
        {
            // got work. set busy.
            ++busy;

            // pull from queue
            auto fn = tasks.front();
            tasks.pop_front();

            // release lock. run async
            latch.unlock();

            // run function outside context
            fn();
            ++processed;

            latch.lock();
            --busy;
            cv_finished.notify_one();
        }
        else if (stop)
        {
            break;
        }
    }
}

// generic function push
template<class F>
void ThreadPool::enqueue(F&& f)
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    tasks.emplace_back(std::forward<F>(f));
    cv_task.notify_one();
}

// waits until the queue is empty.
void ThreadPool::waitFinished()
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    cv_finished.wait(lock, [this](){ return tasks.empty() && (busy == 0); });
}

// a cpu-busy task.
void work_proc()
{
    std::random_device rd;
    std::mt19937 rng(rd());

    // build a vector of random numbers
    std::vector<int> data;
    data.reserve(100000);
    std::generate_n(std::back_inserter(data), data.capacity(), [&](){ return rng(); });
    std::sort(data.begin(), data.end(), std::greater<int>());
}

int main()
{
    ThreadPool tp;

    // run five batches of 100 items
    for (int x=0; x<5; ++x)
    {
        // queue 100 work tasks
        for (int i=0; i<100; ++i)
            tp.enqueue(work_proc);

        tp.waitFinished();
        std::cout << tp.getProcessed() << '\n';
    }

    // destructor will close down thread pool
    return EXIT_SUCCESS;
}

Output

100
200
300
400
500

Best of luck.

Comments