A.H A.H - 1 month ago 11
C# Question

C# Multithreading for three threads working in parallel and sending data to each other and working in order

I have a C# windows form program that uses three threads.
*The first thread is responsible for reading files from input folder and reading all text from each file then send data as string to the second thread using a Queue.
*The second thread is responsible on parsing the string got from the first thread, if the string was a number then send this number to the third thread also using a Queue.
*The third thread is responsible on creating file for the number got from thread 2.
Also I should have a Timer that runs thread 1 every 4 seconds for example until pressing stop.( should thread 2 and thread 3 when thread 1 runs after 4 seconds ?)
Note That the three threads should be running in parallel and should be in the correct order Thread 1 then Thread 2 then Thread 3 and should be getting and sending data for each other.
Below is an image for what i described below and some of the code that I used in my program.
Please I want your help I'm stuck in this ....
enter image description here

Code is :


form1.cs Class


using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace Mltithreading
{
public partial class Form1 : Form
{
public ConcurrentQueue<string> _queue = new ConcurrentQueue<string>();
public Queue<string> Queue1result = new Queue<string>();
public Queue<string> TestQueue = new Queue<string>();
public Queue<string> Queue2result = new Queue<string>();
public Form1()
{
InitializeComponent();
}
/* private Timer timer1;
public void InitTimer()
{
timer1 = new Timer();
timer1.Tick += new EventHandler(timer1_Tick);
timer1.Interval = 2000; // in miliseconds
timer1.Start();
} */
private void timer1_Tick(object sender, EventArgs e)
{
isonline();
}
public void isonline()
{

//
// The user selected a folder and pressed the OK button.
// We print the number of files found.
//
string[] files = Directory.GetFiles(folderBrowserDialog1.SelectedPath);
MessageBox.Show(files[0]);
var a = files[1];
var dir = new DirectoryInfo(folderBrowserDialog1.SelectedPath);
FileInfo[] f = dir.GetFiles();
foreach (System.IO.FileInfo fi in f)
{
Console.WriteLine("{0}: {1}: {2}", fi.Name, fi.LastAccessTime, fi.Length);
}
MessageBox.Show("Files found: " + files.Length.ToString(), "Message");

}
private void button1_Click(object sender, EventArgs e)
{

DialogResult result = folderBrowserDialog1.ShowDialog();
if (result == DialogResult.OK)
{
//
// The user selected a folder and pressed the OK button.
// We print the number of files found.
//
textBox1.Text = folderBrowserDialog1.SelectedPath;
/* try
{
string[] files = Directory.GetFiles(folderBrowserDialog1.SelectedPath);
var dir = new DirectoryInfo(folderBrowserDialog1.SelectedPath);
var output = folderBrowserDialog2.SelectedPath;

FileInfo[] f = dir.GetFiles();
Queue<FileInfo> Q = new Queue<FileInfo>(f);

while (Q.Count > 0)
{
FileInfo current = Q.Dequeue();
Console.WriteLine(current);
// Process 'current'
}
foreach (System.IO.FileInfo fi in f)
{
Console.WriteLine("{0}: {1}: {2}", fi.Name, fi.LastAccessTime, fi.Length);
}
MessageBox.Show("Files found: " + files.Length.ToString(), "Message");
}
catch (Exception excep)
{
Console.WriteLine("An error occurred: '{0}'", excep);
} */
}
}

private void Form1_Load(object sender, EventArgs e)
{

}

private void button2_Click(object sender, EventArgs e)
{
DialogResult result = folderBrowserDialog2.ShowDialog();
if (result == DialogResult.OK)
{
//
// The user selected a folder and pressed the OK button.
// We print the number of files found.
//
textBox2.Text = folderBrowserDialog2.SelectedPath;

}
}

private void button3_Click(object sender, EventArgs e)
{
string input = folderBrowserDialog1.SelectedPath;
string output = folderBrowserDialog2.SelectedPath;
var result = Thread1Start(input, output);
Console.WriteLine("Is thread Still Running :{0}", result);
//var firstThread = new Thread1();
var secondThread = new Thread2();
var thirdThread = new Thread3();
//Thread thread = new Thread(() => th.Readfiles(folderBrowserDialog1.SelectedPath, folderBrowserDialog2.SelectedPath));
//Thread thread1 = new Thread(delegate() { Queue1result = firstThread.Readfiles(folderBrowserDialog1.SelectedPath, folderBrowserDialog2.SelectedPath); });
//Thread thread3 = new Thread(() =>second.test(valueQueue));
Thread thread2 = new Thread(delegate() { Queue2result = secondThread.test(TestQueue); });
Thread thread3 = new Thread(() => thirdThread.CreateFiles(Queue2result,output));
//Thread thread5 = new Thread(second.run);
//thread1.Start();
// thread1.Join();
thread2.Start();
//thread2.Join();
thread3.Start();
//thread1.Start();
//thread1.Join();
//thread2.Start();
//thread1.Join(TimeSpan.Zero);
//Console.WriteLine(thread1.Join(TimeSpan.Zero));
// new System.Threading.Timer (ThreadTimer, "Thread 1 Started working...", 5000, 1000);


}

private void button4_Click(object sender, EventArgs e)
{

}
/* public void ThreadTimer (object data)
{
string input = folderBrowserDialog1.SelectedPath;
string output = folderBrowserDialog2.SelectedPath;
var result = Thread1Start(input, output);
Console.WriteLine("Is thread Still Running :{0}",result);
}
*/
public bool Thread1Start(string input, string output)
{
var firstThread = new Thread1();
Thread thread1 = new Thread(delegate() { Queue1result = firstThread.Readfiles(input, output); });
TestQueue = Queue1result;
thread1.Start();
Console.WriteLine("thread 1 start");
var state = thread1.Join(TimeSpan.Zero);
var secondThread = new Thread2();
var thirdThread = new Thread3();
//Thread thread = new Thread(() => th.Readfiles(folderBrowserDialog1.SelectedPath, folderBrowserDialog2.SelectedPath));
//Thread thread1 = new Thread(delegate() { Queue1result = firstThread.Readfiles(folderBrowserDialog1.SelectedPath, folderBrowserDialog2.SelectedPath); });
//Thread thread3 = new Thread(() =>second.test(valueQueue));
Thread thread2 = new Thread(delegate() { Queue2result = secondThread.test(Queue1result); });
Thread thread3 = new Thread(() => thirdThread.CreateFiles(Queue2result, output));
//Thread thread5 = new Thread(second.run);
//thread1.Start();
// thread1.Join();
thread2.Start();
//thread2.Join();
thread3.Start();
return state;
}
}
}



Thread1.cs class


here using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
namespace Mltithreading
{
class Thread1
{
public void run()
{
while (true)
{
int iterations = 10;
try
{
for (int i = 0; i < iterations; i++)
{
Console.WriteLine("From Thread 1");
Thread.Sleep(2000);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
}
public Queue<string> Readfiles(string inputDirectoryPath, string outputDirectoryPath)
{ Queue<string> thread1Queue = new Queue<string>();
try
{
var dir = new DirectoryInfo(inputDirectoryPath);
FileInfo[] f = dir.GetFiles();

foreach (System.IO.FileInfo fi in f)
{
var filePath = fi.DirectoryName + "\\" + fi.Name;
//File.Copy(filePath, outputDirectoryPath + "\\" + fi.Name);
// Open the file to read from.
string readText = File.ReadAllText(filePath);
thread1Queue.Enqueue(readText);
Console.WriteLine(readText);
Console.WriteLine("thread 1 call read files method ");
//Console.WriteLine("{0}: {1}: {2}", fi.Name, fi.LastAccessTime, fi.Length);
}


}
catch (Exception excep)
{
Console.WriteLine("An error occurred: '{0}'", excep);
}
return thread1Queue;
}


}
}



Thread2.cs class


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Mltithreading
{
class Thread2
{
public void run()
{
while (true)
{
int iterations = 10;
try
{
for (int i = 0; i < iterations; i++)
{
Console.WriteLine("From Thread 2");
Thread.Sleep(2000);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
}

public Queue<string> test(Queue<string> list)
{
Queue<string> thread2Queue = new Queue<string>();
while (list.Count > 0)
{
try
{
var a = list.Dequeue();
Console.WriteLine(a);
int j;
if (Int32.TryParse(a, out j))
{
Console.WriteLine("parsed the string {0} to integer and sent to thread 3", j);
thread2Queue.Enqueue(a);
}
else
Console.WriteLine("String could not be parsed to integer.");
Console.WriteLine("thread 2 parsing string from thread 1");
}
catch (Exception ex)
{
Console.WriteLine(ex);
}


}
return thread2Queue;
}

}


}


Thread3.cs Class


using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Mltithreading
{
class Thread3
{
public void run()
{
while (true)
{
int iterations = 10;
try
{
for (int i = 0; i < iterations; i++)
{
Console.WriteLine("From Thread 3");
Thread.Sleep(2000);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
}
public void CreateFiles(Queue<string> list, string outputDirectoryPath)
{
while (list.Count > 0)
{
string format = "MM-dd-yyyy--HH-mm-ss";
DateTime date = DateTime.Now;
string filename = date.ToString(format);
string file = outputDirectoryPath + "\\" + filename + ".txt";
var a = list.Dequeue();
FileInfo fi = new FileInfo(file);
if (!fi.Exists)
{
fi.Create().Dispose();
System.IO.File.WriteAllText(file,a);
Console.WriteLine("creating new text file {{0}}",file);
}
else
{
Console.WriteLine("file already exists");
}
Console.WriteLine(a);
Console.WriteLine("Thread 3 creating File with date name");
Console.WriteLine(date.ToString(format));

}

}
}
}


Thank you very much for your help .

Answer

I won´t comment on all, there´s too much stuff to talk about, but let´s have a look at your first thread.

Thread thread1 = new Thread(delegate() { Queue1result = firstThread.Readfiles(input, output); });

This line will create a thread that will replace the existing instance in your field Queue1Result (have a look at naming conventions, please!) once after reading some files (if existing). If executed again, The existing queue goes out of scope and will be replaced with a new one, probably not what you want to do.

You need to create those queue instances just once (type to use is ConcurrentQueue), those should then be used as a parameter running the thread procedure. So, redeclare the queues to readonly ConcurrentQueue to create the instances for your fields(give them a good "speaking" name, btw).

public readonly ConcurrentQueue<string> _textFromFilesQueue = new ConcurrentQueue<string>(); 

Once started, you want to regularly look after files again, so your code has to contain a loop. This loop to be executed should be terminable from outside / other threads using a CancellationToken. Lets add a CancellationTokenSource to Form 1:

public CancellationTokenSource CancellationTokenSource { get; set; } 

Your execution code for the first thread could / should look like this:

    public void Readfiles(string inputDirectoryPath, string outputDirectoryPath, ConcurrentQueue<string> queue, CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            try
            {
                var dir = new DirectoryInfo(inputDirectoryPath);
                FileInfo[] f = dir.GetFiles();

                foreach (System.IO.FileInfo fi in f)
                {
                    var filePath = Path.Combine(fi.DirectoryName, fi.Name);
                    //File.Copy(filePath, outputDirectoryPath + "\\" + fi.Name);
                    // Open the file to read from.
                    string textFromFile = File.ReadAllText(filePath);
                    queue.Enqueue(textFromFile);
                    Console.WriteLine(textFromFile);
                    Console.WriteLine("thread 1 call read files method ");
                    //Console.WriteLine("{0}: {1}: {2}", fi.Name, fi.LastAccessTime, fi.Length);
                }
                Thread.Sleep(TimeSpan.FromSeconds(5));
            }
            catch (Exception excep)
            {
                Console.WriteLine("An error occurred: '{0}'", excep);
                Throw excep;
            }
        }

    }

Do not build up a path by concatenating strings, use Path.Combine instead. Also, exceptions that happened should just be rethrown to ensure correct handling from other threads / tasks.

I would recommend using the TPL mechanisms instead of threads, it grants the system more flexibility in handling the tasks to be executed in a parallel manner. Have a look here : https://msdn.microsoft.com/de-de/library/dd460717(v=vs.110).aspx

So, creating and executing this task:

        CancellationTokenSource = new CancellationTokenSource();
        Task task = new Task(() => Readfiles(input, output, _textFromFilesQueue, CancellationTokenSource.Token));
        task.Start();

Stopping the task:

        CancellationTokenSource.Cancel();

Be sure not to overwrite the TokenSource without calling Cancel(), else you are losing the ability to stop the tasks that have already started...