Buyukcaglar Buyukcaglar - 2 months ago 20
C# Question

Parallel.ForEach Source List with Where Condition

I have a code block which processes StoreProducts an then adds or updates them in the database in a for each loop. But this is slow. When I convert the code Parallel.ForEach block, then same products gets both added and updated at the same time. I could not figure out how to safely utilize for the following functionality, any help would be appreciated.

var validProducts = storeProducts.Where(p => p.Price2 > 0
&& !string.IsNullOrEmpty(p.ProductAtt08Desc.Trim())
&& !string.IsNullOrEmpty(p.Barcode.Trim())
).ToList();

var processedProductCodes = new List<string>();

var po = new ParallelOptions()
{
MaxDegreeOfParallelism = 4
};

Parallel.ForEach(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)), po,
(product) =>
{
lock (_lockThis)
{
processedProductCodes.Add(product.ProductCode);
}

// Check if Product Exists in Db

// if product is not in Db Add to Db

// if product is in Db Update product in Db

}


The thing in here is, the list validProducts may have more than one same ProductCode, so they are variants and I have to manage that even one of them is being processed it should not be processed again.

So where condition that is found in the parallel foreach 'validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)' is not working as expected like in normal for each.

Answer

Parallel.ForEach buffers items internally for each thread, one option you could do is switch to a partitioner that does not use buffering

var pat = Partitioner.Create(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode))
                            ,EnumerablePartitionerOptions.NoBuffering);

Parallel.ForEach(pat, po, (product) => ...

That will get you closer but you will still have a race conditions where two of the same object can be processed because you don't break out of the loop if you find a duplicate.

The better option is switch processedProductCodes to a HashSet<string> and change your code to

var processedProductCodes = new HashSet<string>();

var po = new ParallelOptions()
        {
            MaxDegreeOfParallelism = 4
        };

Parallel.ForEach(validProducts, po,
            (product) =>
{
            //You can safely lock on processedProductCodes
            lock (processedProductCodes)
            {
                if(!processedProductCodes.Add(product.ProductCode))
                {
                    //Add returns false if the code is already in the collection.
                    return;
                }
            }

    // Check if Product Exists in Db

    // if product is not in Db Add to Db

    // if product is in Db Update product in Db

}

HashSet has a much faster lookup and is built in to the Add function.

Comments