lapsus lapsus - 3 months ago 31
C# Question

Batch processor (aggregate items out of a queue)

I have a

System.Timers.Timer
which elapses every 3 seconds.

Once it elapses I want to take all the items in my collection and process them in one batch.

The motivation for that is to reduce the number of I/Os on the backend system.

The challenge is that I have multiple concurrent threads appending to the collection/queue. And because of this I thought about using a
ConcurrentQueue<T>
- but that's a bad choice.

This article on social msdn describes the problem here very good.

What I need is a collection/queue where I can get all data at once (ToArray()) and clearing the queue in one atomic operation so that I don't lose any data written to the collection/queue by other threads in the meantime.

private static void T1_Elapsed(object sender, ElapsedEventArgs e)
{
string[] result = _queue.ToArray();
_queue = new ConcurrentQueue<string>(); // strings will be lost :-)
}


I tend to use a simple lock-based approach on a simple
Queue<T>
.

private static readonly object _myLock = new object();

private static void T1_Elapsed(object sender, ElapsedEventArgs e)
{
string[] result;
lock (_myLock)
{
result = _queue.ToArray();
_queue.Clear();
}
}


Now this piece of code has one obvious flaw which can be seen in the producer code:

private static void ProduceItems()
{
//while (!_stop)
for(int i=0; i<int.MaxValue; i++)
{
if (_stop) break;

lock (_myLock) // bad. locks out other producers running on other threads.
{
Console.WriteLine("Enqueue " + i);
_queue.Enqueue("string" + i);
}

Thread.Sleep(1000); // FOR DEBUGGING PURPOSES ONLY
}
}


Of course this code will lock out any other producers trying to append to the queue. Is there any way I can only validate the lock in the producers if the "T1_Elapsed" lock has been set?

Is there anything more suitable to my problem? Maybe anything Observable? Or are there any good "batcher/aggregator" examples?

UPDATE 1: RX

Awesome what you can do with RX :)

I'm still looking into how I can handle errors, retries or re-enqueues in this scenario.

internal class Rx
{
internal static void Start()
{
ISubject<int> subject = new Subject<int>();
ISubject<int> syncedSubject = Subject.Synchronize(subject); // that should do it? - UNTESTED!

var subscription = syncedSubject.Buffer(TimeSpan.FromSeconds(5), 10)
.Subscribe((item) => ProcessBatch(item));

for (int i=1; i<int.MaxValue; i++)
{
syncedSubject.OnNext(i);
Thread.Sleep(200);
Console.WriteLine($"Produced {i}.");
}

Console.ReadKey();
subscription.Dispose();
}

private static void ProcessBatch(IList<int> list)
{
// Aggregate many into one
string joined = string.Join(" ", list);

// Process one
Console.WriteLine($"Wrote {joined} to remote storage.");

// how do you account for errors here?
myProducer.ReEnqueueMyFailedItems(list); // ?
}
}

Answer

TPL DataFlow

I'd say give the TPL DataFlow library a go. It is build upon the Task Paralled Library and designed for these kind of requirements where concurrency plays a big role. See http://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.html for a series of blog posts about this library.

The BatchBlock seems like a good fit for your scenario. See https://msdn.microsoft.com/en-us/library/hh228602(v=vs.110).aspx for a tutorial.

Another example of using the BatchBlock: https://taskmatics.com/blog/simplifying-producer-consumer-processing-with-tpl-dataflow-structures/

Instead of posting data to a queue you will post to one of the available TPL Dataflow blocks.

another option could be using

Reactive Extensions

See http://www.introtorx.com/uat/content/v1.0.10621.0/01_WhyRx.html for a good introduction

It provides batching support as well:

void Sample()
{
    var dataprovider = new Subject<int>();

    var subscription = dataprovider
        .Buffer(TimeSpan.FromMinutes(3))
        .Subscribe(listOfNumbers => 
        {
            // do something with batch of items
            var batchSize = listOfNumbers.Count;
        });

    for(int i = 0; i <= 5; ++i)
    {
        dataprovider.OnNext(i);
    }

    subscription.Dispose();
}

In the above example, you need some modifications to enable multiple producers from different threads to add data, see reactive extension OnNext. It is simplified code(!) but it gives you a general idea of the concept of using RX.

Buffering can be done using a max buffer size, a given timeperiod or a combination of both. So it can replace your timer as well.

Instead of adding items to a queue you call OnNext on the Subject

Both TPL DataFlow and RX eliminate the use of a queue or something alike that needs to be cleared, so it will free you from that pain.

Comments