Alex Zywicki Alex Zywicki - 1 month ago 16
C++ Question

Thread Pool: Block destruction until all work is done

I have the following thread pool implementation:

template<typename... event_args>
class thread_pool{
public:

using handler_type = std::function<void(event_args...)>;

thread_pool(handler_type&& handler, std::size_t N = 4, bool finish_before_exit = true) : _handler(std::forward<handler_type&&>(handler)),_workers(N),_running(true),_finish_work_before_exit(finish_before_exit)
{
for(auto&& worker: _workers)
{
//worker function
worker = std::thread([this]()
{
while (_running)
{
//wait for work
std::unique_lock<std::mutex> _lk{_wait_mutex};
_cv.wait(_lk, [this]{
return !_events.empty() || !_running;
});
//_lk unlocked

//check to see why we woke up
if (!_events.empty()) {//was it new work
std::unique_lock<std::mutex> _readlk(_queue_mutex);
auto data = _events.front();
_events.pop();
_readlk.unlock();

invoke(std::move(_handler), std::move(data));
_cv.notify_all();
}else if(!_running){//was it a signal to exit
break;
}
//or was it spurious and we should just ignore it
}
});
//end worker function
}
}

~thread_pool()
{
if(_finish_work_before_exit)
{//block destruction until all work is done
std::condition_variable _work_remains;
std::mutex _wr;

std::unique_lock<std::mutex> lk{_wr};
_work_remains.wait(lk,[this](){
return _events.empty();
});
}

_running=false;

//let all workers know to exit
_cv.notify_all();


//attempt to join all workers
for(auto&& _worker: _workers)
{
if(_worker.joinable())
{
_worker.join();
}
}
}

handler_type& handler()
{
return _handler;
}

void propagate(event_args&&... args)
{
//lock before push
std::unique_lock<std::mutex> _lk(_queue_mutex);
{
_events.emplace(std::make_tuple(args...));
}
_lk.unlock();//explicit unlock
_cv.notify_one();//let worker know that data is available
}

private:
bool _finish_work_before_exit;

handler_type _handler;

std::queue<std::tuple<event_args...>> _events;

std::vector<std::thread> _workers;

std::atomic_bool _running;

std::condition_variable _cv;

std::mutex _wait_mutex;

std::mutex _queue_mutex;


//helpers used to unpack tuple into function call
template<typename Func, typename Tuple, std::size_t... I>
auto invoke_(Func&& func, Tuple&& t, std::index_sequence<I...>)
{
return func(std::get<I>(std::forward<Tuple&&>(t))...);
}

template<typename Func, typename Tuple, typename Indicies = std::make_index_sequence<std::tuple_size<Tuple>::value>>
auto invoke(Func&& func, Tuple&& t)
{
return invoke_(std::forward<Func&&>(func), std::forward<Tuple&&>(t), Indicies());
}
};


I recently added this section to the destructor:

if(_finish_work_before_exit)
{//block destruction until all work is done
std::condition_variable _work_remains;
std::mutex _wr;

std::unique_lock<std::mutex> lk{_wr};
_work_remains.wait(lk,[this](){
return _events.empty();
});
}


The intent was to have the destructor block until the work queue was fully consumed.

But it seems to put the program into deadlock. aAll of the work does get completed, but the wait does not seem to end when the work is done.

Consider this example main:

std::mutex writemtx;


thread_pool<int> pool{
[&](int i){
std::unique_lock<std::mutex> lk{writemtx};
std::cout<<i<<" : "<<std::this_thread::get_id()<<std::endl;
},
8//threads
};

for (int i=0; i<8192; ++i) {
pool.propagate(std::move(i));
}


How can I have the destructor wait for the completion of the work without causing deadlock?

Answer

The reason your code is deadlocked is that _work_remains is a condition variable which is not "notified" by any part of your code. You would need to make that a class attribute and have it notified by any thread that picks up the last event from the _events.