Roy Roy - 4 months ago 24
C# Question

BlockingCollection and Dictionary

I have a question about writing codes using BlockingCollection and Dictionary.

My goal is to read a bunch of text files and process them in a parallel fashion. The processed data would be stored in a BlockingCollection instance so that these processed data can be written to a file.

The reason why I want to use the BlockingCollection is ...

(1) to save time while the GenerateDataFiles() is doing CPU-intensive work and the consumer Task can do IO related work in the meantime, and

(2) to reduce memory usage compared to the case when I store all the processed data in a list before writing any of them to a file.

For (2), if I store all data before writing them to a file, the memory consumption is more than my desktop can afford (since it reads more than 30GB of data), and so I have to use this producer-consumer approach.

Also, I got a problem in inserting the key-value pair in the BlockingCollection instance (or dictionary). Please indicate the correct approach to do the data insertion.

The following codes are my attempt to address the problem. I may have made some mistakes in this since I am new to BlockingCollection. Please suggest some changes (and amended codes) so that I can resolve the problem.

class SampleClass
{
static void Main(string[] args)
{
SampleClass sampleClass = new SampleClass();
sampleClass.run();
}

private void run()
{
Task consumer = Task.Factory.StartNew(() => WriteDataToFiles());
GenerateDataFiles();
}

BlockingCollection<Dictionary<string, List<string>>> bc = new BlockingCollection<Dictionary<string, List<string>>>();

private void GenerateDataFiles()
{
DirectoryInfo directory = new DirectoryInfo(@"D:\Data\");
FileInfo[] array_FileInfo = directory.GetFiles("*.txt", SearchOption.TopDirectoryOnly);

Parallel.ForEach(array_FileInfo, fileInfo =>
{
string[] array_Lines = File.ReadAllLines(fileInfo.FullName);

// do some CPU-intensive data parsing and then add the processed data to the blocking collection
// It has to be inserted in pairs (key = file path, value = list of strings to be written to this file)

});
}

private void WriteDataToFiles()
{
foreach (var item in bc.GetConsumingEnumerable())
{
foreach (var key in item.Keys)
{
File.WriteAllLines(key, item[key]);
}
}

}
}

Answer

Consider using Tuple instead of a Dictionary inside the BlockingCollection. Additionally, you need a call to CompleteAdding() to end the foreach in WriteDataToFiles.

BlockingCollection<Tuple<string, List<string>>> bc = new BlockingCollection<Tuple<string, List<string>>>();

private void GenerateDataFiles()
{
    DirectoryInfo directory = new DirectoryInfo(@"D:\Data\");
    FileInfo[] array_FileInfo = directory.GetFiles("*.txt", SearchOption.TopDirectoryOnly);

    Parallel.ForEach(array_FileInfo, fileInfo => 
    {
        string[] array_Lines = File.ReadAllLines(fileInfo.FullName);

        // do some CPU-intensive data parsing and then add the processed data to the blocking collection
        // It has to be inserted in pairs (key = file path, value = list of strings to be written to this file)
        List<string> processedData = new List<string>();  // ... and add content
        bc.Add(new Tuple<string, List<string>>(fileInfo.FullName, processedData));
    });
    bc.CompleteAdding();
}

private void WriteDataToFiles()
{
    foreach (var tuple in bc.GetConsumingEnumerable())
    {
        File.WriteAllLines(tuple.Item1, tuple.Item2);
    }
}