Be Sharp Be Sharp -3 years ago 175
C# Question

Parallel.ForEach accessing a row in a DataTable more than once

I do have a DataTable with 1k rows and I'm using Parallel.ForEach to iterate through the rows
the following method iterates through the rows, every row considered as a MailMessage and it initializes the MailMessage parameters and then it saves it on the disk as an *.eml file
later an SMTP Queue will pick the eml files and send them

public static bool GenerateValidEmlFiles(DataTable valids)
{
wroteToDb = false;
// init. cmp id from the table
CmpId = int.Parse(valids.Rows[0][CampaignId].ToString());

Parallel.ForEach(valids.AsEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 1 }, (currentRow) =>
{
CancellationTokenSource tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
CancellationToken token = tokenSource.Token;

// create new task "thread" for every row in the DataTable
Task task = Task.Factory.StartNew(() =>
{
try
{


if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested();
string id = currentRow[CampaignRecipientId].ToString();
writeEvent(int.Parse(id), "Recipient row is being processed");

// MailBee.Mime MailMessage
using (MailMessage msg = new MailMessage())
{
// init. parameters
msg.From.AsString = currentRow[EmailFrom].ToString();
msg.To.AsString = currentRow[EmailTo].ToString();
msg.ReplyTo.AsString = currentRow[EmailReplyTo].ToString();
msg.Subject = "=?UTF-8?B?" + Convert.ToBase64String(Encoding.UTF8.GetBytes(currentRow[EmailSubject].ToString())) + "?=";
msg.BodyHtmlText = HTMLTags.Replace("ReplaceBody", currentRow[EmailBody].ToString());
//assing X-TWC id number into the header.
msg.Headers.Add(ConfigurationManager.AppSettings["TWCHeader"].ToString(), id, false);
//if there is an attachment add it to the Message
if ((currentRow[EmailAttachment] as object != null) && !string.IsNullOrEmpty(currentRow[EmailAttachmentName].ToString()))
{
byte[] filedata = (byte[])currentRow[EmailAttachment];
msg.Attachments.Add(filedata, currentRow[EmailAttachmentName].ToString(), "", "", null, NewAttachmentOptions.None, MailTransferEncoding.None);
}

msg.EncodeAllHeaders(Encoding.UTF8, MailBee.Mime.HeaderEncodingOptions.Base64);
//save message as *.Eml to be sent by the SMTP Queue
msg.SaveMessage(@"E:\WEBS\Ready\" + id + "_" + msg.To.AsString + ".eml");
writeEvent(int.Parse(id), "EML file written to Disk");

//adding ID to a list to write the whole list back to the DB in a single DB call.
if (!IDs.Contains(id))
IDs.Add(id);

}

}
catch (Exception ex)
{
using (StreamWriter sw = new StreamWriter(AppDomain.CurrentDomain.BaseDirectory + "\\INNER.txt", true))
{
sw.WriteLine(DateTime.Now.ToString() + ": " + ex.Source.ToString().Trim() + ", " + ex.Message.ToString().Trim() + ex.StackTrace);
sw.Flush();
sw.Close();
}
}

}, token);
task.Wait();
});

if (!wroteToDb)
{
WriteEmlEvents();
//set lock flag to true
wroteToDb = true;
return true;
}
return false;
}


the problem is that the first rows are being processed twice or accessed by the FroEach twice!
I tried to check the duplicates in the DataTable and I found no duplicate rows in it at all.
Do you recommend using DataReader instead of DataTable?

I tried the suggested answer, and i got this exception:
FYI it only happens when i remove MaxDegreeOfParallelism and when I set it to 1 it works fine

8/15/2017 3:37:05 PM: MailBee.NET.45, IOException occurred. InnerException message follows: The process cannot access the file 'E:\WEBS\TerranovaQueue Files\Ready\749013_zz.000063@gioele.ca.eml' because it is being used by another process. at a.n.b(String A_0, Byte[] A_1, Int32 A_2, Int32 A_3, Byte[] A_4)
at MailBee.Mime.MailMessage.SaveMessage(String filename)

System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: Collection was modified; enumeration operation might not execute.
at System.Data.RBTree`1.RBTreeEnumerator.MoveNext()
at System.Linq.Enumerable.<CastIterator>d__94`1.MoveNext()
at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk_Buffered(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed)
at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed)
at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerator.GrabNextChunk(Int32 requestedChunkSize)
at System.Collections.Concurrent.Partitioner.DynamicPartitionEnumerator_Abstract`2.MoveNext()
at System.Threading.Tasks.Parallel.<>c__DisplayClass42_0`2.<PartitionerForEachWorker>b__1()
at System.Threading.Tasks.Task.InnerInvoke()
at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
at System.Threading.Tasks.Task.<>c__DisplayClass176_0.<ExecuteSelfReplicating>b__0(Object )
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at System.Threading.Tasks.Task.Wait()
at System.Threading.Tasks.Parallel.PartitionerForEachWorker[TSource,TLocal](Partitioner`1 source, ParallelOptions parallelOptions, Action`1 simpleBody, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IEnumerable`1 source, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Action`3 bodyWithStateAndIndex, Func`4 bodyWithStateAndLocal, Func`5 bodyWithEverything, Func`1 localInit, Action`1 localFinally)
at System.Threading.Tasks.Parallel.ForEach[TSource](IEnumerable`1 source, Action`1 body)
at EmailValidatorLibrary.EmailGenerator.GenerateValidEmlFiles(DataTable valids) in C:\Users\basel.abdo\Source\Terranova\Terranova-Preprod\TerranovaService\EmailValidatorLibrary\EmailGenerator.cs:line 50
at TerranovaService.TerranovaService.StartProcess() in C:\Users\basel.abdo\Source\Terranova\Terranova-Preprod\TerranovaService\TerranovaService\TerranovaService.cs:line 133
---> (Inner Exception #0) System.InvalidOperationException: Collection was modified; enumeration operation might not execute.
at System.Data.RBTree`1.RBTreeEnumerator.MoveNext()
at System.Linq.Enumerable.<CastIterator>d__94`1.MoveNext()
at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk_Buffered(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed)
at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerable.GrabChunk(KeyValuePair`2[] destArray, Int32 requestedChunkSize, Int32& actualNumElementsGrabbed)
at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.InternalPartitionEnumerator.GrabNextChunk(Int32 requestedChunkSize)
at System.Collections.Concurrent.Partitioner.DynamicPartitionEnumerator_Abstract`2.MoveNext()
at System.Threading.Tasks.Parallel.<>c__DisplayClass42_0`2.<PartitionerForEachWorker>b__1()
at System.Threading.Tasks.Task.InnerInvoke()
at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
at System.Threading.Tasks.Task.<>c__DisplayClass176_0.<ExecuteSelfReplicating>b__0(Object )<---

---> (Inner Exception #1) System.IO.IOException: The process cannot access the file 'E:\WEBS\TWC Mail Services\INNER.txt' because it is being used by another process.
at System.IO.__Error.WinIOError(Int32 errorCode, String maybeFullPath)
at System.IO.FileStream.Init(String path, FileMode mode, FileAccess access, Int32 rights, Boolean useRights, FileShare share, Int32 bufferSize, FileOptions options, SECURITY_ATTRIBUTES secAttrs, String msgPath, Boolean bFromProxy, Boolean useLongPath, Boolean checkHost)
at System.IO.FileStream..ctor(String path, FileMode mode, FileAccess access, FileShare share, Int32 bufferSize, FileOptions options, String msgPath, Boolean bFromProxy, Boolean useLongPath, Boolean checkHost)
at System.IO.StreamWriter.CreateFile(String path, Boolean append, Boolean checkHost)
at System.IO.StreamWriter..ctor(String path, Boolean append, Encoding encoding, Int32 bufferSize, Boolean checkHost)
at System.IO.StreamWriter..ctor(String path, Boolean append)
at EmailValidatorLibrary.EmailGenerator.<>c.<GenerateValidEmlFiles>b__19_0(DataRow currentRow) in C:\Users\basel.abdo\Source\Terranova\Terranova-Preprod\TerranovaService\EmailValidatorLibrary\EmailGenerator.cs:line 107
at System.Threading.Tasks.Parallel.<>c__DisplayClass42_0`2.<PartitionerForEachWorker>b__1()
at System.Threading.Tasks.Task.InnerInvoke()
at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
at System.Threading.Tasks.Task.<>c__DisplayClass176_0.<ExecuteSelfReplicating>b__0(Object )<---
mscorlib

Answer Source

What you are experiencing is a classic problem with a closure. The variable currentRow participates in a closure, meaning (among other things) that its value might change before the Task has a chance to execute.

You can eliminate the problem by getting rid of the Task. Just execute the code directly in the lambda expression that you are passing to Parallel.ForEach (and increase its max degree of parallelism). It is already executing on a separate thread (due to the Parallel.ForEach) so there is absolutely no point in wrapping it in a Task too.

public static bool GenerateValidEmlFiles(DataTable valids)
{
    wroteToDb = false;
    // init. cmp id from the table
    CmpId = int.Parse(valids.Rows[0][CampaignId].ToString());

    Parallel.ForEach(valids.AsEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 10 }, (currentRow) =>
    {                
        string id = currentRow[CampaignRecipientId].ToString();
        writeEvent(int.Parse(id), "Recipient row is being processed");

        // MailBee.Mime MailMessage
        using (MailMessage msg = new MailMessage())
        {
            //etc. etc.

P.S. Make sure IDs is a type that supports concurrency (e.g. a ConcurrentBag<int>) and make sure writeEvent is thread safe.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download