mike01010 mike01010 - 3 months ago 38
C# Question

Why does Parallel.Foreach create endless threads?

The code below continues to create threads, even when the queue is empty..until eventually an OutOfMemory exception occurs. If i replace the Parallel.ForEach with a regular foreach, this does not happen. anyone know of reasons why this may happen?

public delegate void DataChangedDelegate(DataItem obj);

public class Consumer
{
public DataChangedDelegate OnCustomerChanged;
public DataChangedDelegate OnOrdersChanged;

private CancellationTokenSource cts;
private CancellationToken ct;
private BlockingCollection<DataItem> queue;

public Consumer(BlockingCollection<DataItem> queue) {
this.queue = queue;
Start();
}

private void Start() {
cts = new CancellationTokenSource();
ct = cts.Token;
Task.Factory.StartNew(() => DoWork(), ct);
}

private void DoWork() {

Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
if (item.DataType == DataTypes.Customer) {
OnCustomerChanged(item);
} else if(item.DataType == DataTypes.Order) {
OnOrdersChanged(item);
}
});
}
}

Answer

I think Parallel.ForEach() was made primarily for processing bounded collections. And it doesn't expect collections like the one returned by GetConsumingPartitioner(), where MoveNext() blocks for a long time.

The problem is that Parallel.ForEach() tries to find the best degree of parallelism, so it starts as many Tasks as the TaskScheduler lets it run. But the TaskScheduler sees there are many Tasks that take a very long time to finish, and that they're not doing anything (they block) so it keeps on starting new ones.

I think the best solution is to set the MaxDegreeOfParallelism.

As an alternative, you could use TPL Dataflow's ActionBlock. The main difference in this case is that ActionBlock doesn't block any threads when there are no items to process, so the number of threads wouldn't get anywhere near the limit.