lapsus lapsus - 1 month ago 21x
C# Question

Batch processor (aggregate items out of a queue)

I have a

which elapses every 3 seconds.

Once it elapses I want to take all the items in my collection and process them in one batch.

The motivation for that is to reduce the number of I/Os on the backend system.

The challenge is that I have multiple concurrent threads appending to the collection/queue. And because of this I thought about using a
- but that's a bad choice.

This article on social msdn describes the problem here very good.

What I need is a collection/queue where I can get all data at once (ToArray()) and clearing the queue in one atomic operation so that I don't lose any data written to the collection/queue by other threads in the meantime.

private static void T1_Elapsed(object sender, ElapsedEventArgs e)
string[] result = _queue.ToArray();
_queue = new ConcurrentQueue<string>(); // strings will be lost :-)

I tend to use a simple lock-based approach on a simple

private static readonly object _myLock = new object();

private static void T1_Elapsed(object sender, ElapsedEventArgs e)
string[] result;
lock (_myLock)
result = _queue.ToArray();

Now this piece of code has one obvious flaw which can be seen in the producer code:

private static void ProduceItems()
//while (!_stop)
for(int i=0; i<int.MaxValue; i++)
if (_stop) break;

lock (_myLock) // bad. locks out other producers running on other threads.
Console.WriteLine("Enqueue " + i);
_queue.Enqueue("string" + i);


Of course this code will lock out any other producers trying to append to the queue. Is there any way I can only validate the lock in the producers if the "T1_Elapsed" lock has been set?

Is there anything more suitable to my problem? Maybe anything Observable? Or are there any good "batcher/aggregator" examples?


Awesome what you can do with RX :)

I'm still looking into how I can handle errors, retries or re-enqueues in this scenario.

internal class Rx
internal static void Start()
ISubject<int> subject = new Subject<int>();
ISubject<int> syncedSubject = Subject.Synchronize(subject); // that should do it? - UNTESTED!

var subscription = syncedSubject.Buffer(TimeSpan.FromSeconds(5), 10)
.Subscribe((item) => ProcessBatch(item));

for (int i=1; i<int.MaxValue; i++)
Console.WriteLine($"Produced {i}.");


private static void ProcessBatch(IList<int> list)
// Aggregate many into one
string joined = string.Join(" ", list);

// Process one
Console.WriteLine($"Wrote {joined} to remote storage.");

// how do you account for errors here?
myProducer.ReEnqueueMyFailedItems(list); // ?


TPL DataFlow

I'd say give the TPL DataFlow library a go. It is build upon the Task Paralled Library and designed for these kind of requirements where concurrency plays a big role. See for a series of blog posts about this library.

The BatchBlock seems like a good fit for your scenario. See for a tutorial.

Another example of using the BatchBlock:

Instead of posting data to a queue you will post to one of the available TPL Dataflow blocks.

another option could be using

Reactive Extensions

See for a good introduction

It provides batching support as well:

void Sample()
    var dataprovider = new Subject<int>();

    var subscription = dataprovider
        .Subscribe(listOfNumbers => 
            // do something with batch of items
            var batchSize = listOfNumbers.Count;

    for(int i = 0; i <= 5; ++i)


In the above example, you need some modifications to enable multiple producers from different threads to add data, see reactive extension OnNext. It is simplified code(!) but it gives you a general idea of the concept of using RX.

Buffering can be done using a max buffer size, a given timeperiod or a combination of both. So it can replace your timer as well.

Instead of adding items to a queue you call OnNext on the Subject

Both TPL DataFlow and RX eliminate the use of a queue or something alike that needs to be cleared, so it will free you from that pain.