Vincent Vincent - 2 months ago 29
C++ Question

Parallel search of distinct values?

Consider the following code :

// Preprocessor
#include <iostream>
#include <chrono>
#include <thread>
#include <algorithm>
#include <mutex>
#include <random>

// Main function
int main()
{
// A random vector of size 100 with 10 different random values
std::vector<unsigned int> vector = make_random_vector(100, 10);
// At the end, the result should be the 10 different random values
std::vector<unsigned int> result;
// Mutex to deals with concurrency
std::mutex mutex;
// Parallel search
parallel_for_each(vector.begin(), vector.end(),
[=, &result, &mutex](const unsigned int& i){
/* CRITICAL SECTION: BEGIN */
// If the current element is not yet in the resulting vector, inserts it
if (!std::binary_search(result.begin(), result.end(), i)) {
mutex.lock();
result.insert(std::lower_bound(result.begin(), result.end(), i), i);
mutex.unlock();
}
/* CRITICAL SECTION: END */
});
// Unique values
result.erase(std::unique(result.begin(), result.end()), result.end());
// Display the result
std::for_each(result.begin(), result.end(),
[](const unsigned int& i){
std::cout<<i<<std::endl;
});
// Finalization
return 0;
}


The goal is to find the n distinct values in a vector in parallel.

My question is : is the preceding code OK (no problem of concurrency), and if not, how to correct it ?




Note: this code has calls to two functions :

parallel_for_each
which executes the provided function on the provided number of threads :

// Parallel execution returning the execution time in seconds
template <class Iterator, class Function>
double parallel_for_each(const Iterator& first, const Iterator& last, Function&& function, const int nthreads = std::thread::hardware_concurrency())
{
const std::chrono::high_resolution_clock::time_point tbegin = std::chrono::high_resolution_clock::now();
const long long int ntasks = std::max(static_cast<int>(1), nthreads);
const long long int group = std::max(static_cast<long long int>(first < last), static_cast<long long int>((last-first)/ntasks));
std::vector<std::thread> threads;
Iterator it = first;
threads.reserve(ntasks);
for (it = first; it < last-group; it += group) {
threads.push_back(std::thread([=, &last, &group, &function](){std::for_each(it, std::min(it+group, last), function);}));
}
std::for_each(it, last, function);
std::for_each(threads.begin(), threads.end(), [](std::thread& current){current.join();});
return std::chrono::duration_cast<std::chrono::duration<double> >(std::chrono::high_resolution_clock::now()-tbegin).count();
}


make_random_vector
which produces a random vector of nelements with nvalues different random values

// Produces a random vector of nelements with nvalues different random values
std::vector<unsigned int> make_random_vector(const unsigned int nelements, const unsigned int nvalues)
{
std::vector<unsigned int> vector(nelements);
std::vector<unsigned int> values(nvalues);
std::random_device device;
std::mt19937 engine(device());
std::uniform_int_distribution<unsigned int> distribution1;
std::uniform_int_distribution<unsigned int> distribution2(0, nvalues-1);
std::for_each(values.begin(), values.end(), [=, &distribution1, &engine](unsigned int& i){i = distribution1(engine);});
std::for_each(vector.begin(), vector.end(), [=, &distribution2, &engine, &values](unsigned int& i){i = values[distribution2(engine)];});
return vector;
}

Answer

Your code has problem, as you only protect concurrent write access but not read access of result.

A solution would be to move the mutex locking outside of the if as follow:

[=, &result, &mutex](const unsigned int& i){
    std::lock_guard<std::mutex> lck (mutex);

    // If the current element is not yet in the resulting vector, inserts it
    if (!std::binary_search(result.begin(), result.end(), i)) {
        result.insert(std::lower_bound(result.begin(), result.end(), i), i);
    }
}

but it will break the purpose of the parallel for :/

An other solution would be to work on different result set, and join the result at the end of the loop.

An other solution may be a variant of Double-checked locking but requires to copy result at each insertion.