madebybear madebybear -4 years ago 189
C# Question

Parallel.For SendAsync to BufferBlock to Async Transform?

I am learning about the TPL Dataflow Library. So far it's exactly what I was looking for.

I've created a simple class (below) that performs the following functions


  • Upon execution of
    ImportPropertiesForBranch
    I go to a 3rd party api and get a list of properties

  • A xml list is returned and deserialized into an array of property data (id, api endpoint, lastupdated). There are around 400+ properties (as in houses).

  • I then use a
    Parallel.For
    to
    SendAsync
    the property data into my
    propertyBufferBlock

  • The
    propertyBufferBlock
    is linked to a
    propertyXmlBlock
    (which itself is a
    TransformBlock
    ).

  • The
    propertyXmlBlock
    then (asynchronously) goes back to the API (using the api endpoint supplied in the property data) and fetches the property xml for deserialization.

  • Once the xml is returned and becomes available, we can then deserialize

  • Later, I'll add more
    TransformBlock
    s to persist it to a data store.



So my questions are;


  • Are there any potential bottlenecks or areas of the code that could be troublesome? I'm aware that I've not included any error handling or cancellation (this is to come).

  • Is it ok to
    await
    async calls inside a
    TransformBlock
    or is this a
    bottleneck?

  • Although the code works , I am worried about the buffering and asyncronsity of the
    Parallel.For
    ,
    BufferBlock
    and async in the
    TransformBlock
    . I'm not sure its the best way and I maybe mixing up some concepts.



Any guidance, improvemets and pitfall advice welcomed.

using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using My.Interfaces;
using My.XmlService.Models;

namespace My.ImportService
{
public class ImportService
{

private readonly IApiService _apiService;
private readonly IXmlService _xmlService;
private readonly IRepositoryService _repositoryService;

public ImportService(IApiService apiService,
IXmlService xmlService,
IRepositoryService repositoryService)
{
_apiService = apiService;
_xmlService = xmlService;
_repositoryService = repositoryService;

ConstructPipeline();
}

private BufferBlock<propertiesProperty> propertyBufferBlock;
private TransformBlock<propertiesProperty, string> propertyXmlBlock;
private TransformBlock<string, propertyType> propertyDeserializeBlock;
private ActionBlock<propertyType> propertyCompleteBlock;

public async Task<bool> ImportPropertiesForBranch(string branchName, int branchUrlId)
{
var propertyListXml = await _apiService.GetPropertyListAsync(branchUrlId);

if (string.IsNullOrEmpty(propertyListXml))
return false;

var properties = _xmlService.DeserializePropertyList(propertyListXml);

if (properties?.property == null || properties.property.Length == 0)
return false;

// limited to the first 20 for testing
Parallel.For(0, 20,
new ParallelOptions {MaxDegreeOfParallelism = 3},
i => propertyBufferBlock.SendAsync(properties.property[i]));

propertyBufferBlock.Complete();

await propertyCompleteBlock.Completion;

return true;
}

private void ConstructPipeline()
{
propertyBufferBlock = GetPropertyBuffer();
propertyXmlBlock = GetPropertyXmlBlock();
propertyDeserializeBlock = GetPropertyDeserializeBlock();
propertyCompleteBlock = GetPropertyCompleteBlock();

propertyBufferBlock.LinkTo(
propertyXmlBlock,
new DataflowLinkOptions {PropagateCompletion = true});

propertyXmlBlock.LinkTo(
propertyDeserializeBlock,
new DataflowLinkOptions {PropagateCompletion = true});

propertyDeserializeBlock.LinkTo(
propertyCompleteBlock,
new DataflowLinkOptions {PropagateCompletion = true});
}

private BufferBlock<propertiesProperty> GetPropertyBuffer()
{
return new BufferBlock<propertiesProperty>();
}

private TransformBlock<propertiesProperty, string> GetPropertyXmlBlock()
{
return new TransformBlock<propertiesProperty, string>(async propertiesProperty =>
{
Debug.WriteLine($"getting xml {propertiesProperty.prop_id}");
var propertyXml = await _apiService.GetXmlAsStringAsync(propertiesProperty.url);
return propertyXml;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}

private TransformBlock<string, propertyType> GetPropertyDeserializeBlock()
{
return new TransformBlock<string, propertyType>(xmlAsString =>
{
Debug.WriteLine($"deserializing");
var propertyType = _xmlService.DeserializeProperty(xmlAsString);
return propertyType;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}

private ActionBlock<propertyType> GetPropertyCompleteBlock()
{
return new ActionBlock<propertyType>(propertyType =>
{
Debug.WriteLine($"complete {propertyType.id}");
Debug.WriteLine(propertyType.address.display);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 2
});
}
}
}

Answer Source

Are there any potential bottlenecks or areas of the code that could be troublesome?

In general your approach looks good and the potential bottle neck is that you are limiting parallel processing of your blocks with MaxDegreeOfParallelism = 1. Based on the description of the problem each item can be processed independently of others and that's why you can process multiple items at a time.

Is it ok to await async calls inside a TransformBlock or is this a bottleneck?

It is perfectly fine because TPL DataFlow supports async operations.

Although the code works , I am worried about the buffering and asyncronsity of the Parallel.For, BufferBlock and async in the TransformBlock. I'm not sure its the best way and I maybe mixing up some concepts.

One, potential problem in your code that could make you shoot yourself in the foot is calling async method in Parallel.For and then calling propertyBufferBlock.Complete();. The problem here is that Parallel.For does not support async actions and the way you invoke it will call propertyBufferBlock.SendAsync and move on before returned task is completed. Which means that by the time Parallel.For exits some operations might still be in running state and items are not yet added to buffer block. And if you will then call propertyBufferBlock.Complete(); those pending items will throw exception and items won't be added to processing. You will get unobserved exception.

You could use ForEachAsync form this blog post to ensure that all items are added to the block before completing the block. But if you are still limitting processing to 1 operation you can simply add items one at a time. I am not sure how propertyBufferBlock.SendAsync is implemented, but it can be that in will internally restrict to adding one item at a time so parallel adding does not make any sense.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download