Petter T Petter T - 2 months ago 42
C# Question

TPL Dataflow async scheduling

The scheduling of

async
Task
s does not work as I expected in TPL Dataflow. In the example below, I expected the
ActionBlock
to process data from the
TransformBlock
as soon as it is available. But it is waiting on the second (delayed) result before it proceeds to the third. What have I misunderstood here? Is there some requirement on the order of processing?

public class TestDataFlow
{
public System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();

public async Task Flow()
{
watch.Start();

var plus10 = new TransformBlock<int, int>(async input =>
{
if (input == 2)
{
await Task.Delay(5000);
}
Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
return input + 10;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
});

var printSolution = new ActionBlock<int>(input =>
{
Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
});

plus10.LinkTo(printSolution);

List<int> inputs = new List<int> { 1, 2, 3 };
foreach (var input in inputs)
{
await plus10.SendAsync(input);
}
}
}


Output:

Exiting plus10 for input 1 @ 115.8583
Exiting plus10 for input 3 @ 116.6973
Solution: 11 @ 126.0146
Exiting plus10 for input 2 @ 5124.4074
Solution: 12 @ 5124.9014
Solution: 13 @ 5126.4834

Answer

TPL Dataflow guarantees order of the input and output queues no matter how many items are processed in parallel.

"Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, every message must be read from the source block before the source block can process the next message"

From Dataflow (Task Parallel Library)

If you want items to move on to the next block exactly when they are finished processing you should transfer them explicitly yourself which turns your TransformBlock to an ActionBlock:

var printSolution = new ActionBlock<int>(input =>
{
    Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},executionDataflowBlockOptions);

var plus10 = new ActionBlock<int>(async input =>
{
    if (input == 2)
    {
        await Task.Delay(5000);
    }
    Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
    await printSolution.SendAsync(input + 10);
}, executionDataflowBlockOptions);
Comments