Vishnu V Potti Vishnu V Potti - 1 month ago 7
Java Question

Alternate apporach in processing huge array using Multithreading

I am new to multithreading in Java .I have implemented a multithreading program in java to process an array and need your help and suggestions to optimise it and refactor it if possible.

Scenario

We get a huge csv file, which has over 1000s of rows and we need to process it.
So i basically convert them to array, split them and pass to execution program and input will be subset of the arrays.
Right now i am splitting the array to 20 equal subset and passing to 20 threads for execution. It is taking ~2 mins which is fine . Without multithreading it takes 30 mins.

Help needed

I am giving the snapshot of my code below.
Although it works fine, i am wondering whether there is any way to standardize it more and refactor it. Rightnow it looks clumsy.
TO be more specific, instead of creating individual thread runners if i can parameterize it, then it will be great.

Code

private static void ProcessRecords(List<String[]> inputCSVData)
{

// Do some operation
}


**In the main program**

public static void main(String[] args)throws ClassNotFoundException, SQLException, IOException, InterruptedException
{
int size = csvData.size();
// Split the array
int firstArraySize = (size / 20);
int secondArrayEndIndex = (firstArraySize * 2) - 1;

csvData1 = csvData.subList(1, firstArraySize);
csvData2 = csvData.subList(firstArraySize, secondArrayEndIndex);
// .... and so on

Thread thread1 = new Thread(new Runnable() {
public void run() {
try {
ProcessRecords(csvData1);
} catch (ClassNotFoundException | SQLException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

Thread thread2 = new Thread(new Runnable() {
public void run()
{
try {
ProcessRecords(csvData2);
} catch (ClassNotFoundException | SQLException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

**and so on for 20 times**

thread1.start();
thread2.start();
//... For all remaining threads
// thread20.start();

thread1.join();
thread2.join();
//... For all remaining threads
// thread20.join();

}

Answer

Since Java 7, you can implement such mechanism efficiently out of the box thanks to the Fork/Join Framework. Starting from Java 8, you can do it directly with the Stream API more precisely with a parallel stream which uses behind the scene a ForkJoinPool in order to leverage its work-stealing algorithm to provide the best possible performances.

In your case, you could process it line by line as next:

csvData.parallelStream().forEach(MyClass::ProcessRecord);

With a method ProcessRecord of the class MyClass of type:

private static void ProcessRecord(String[] inputCSVData){
    // Do some operation
}

By default a parallel stream will use the common ForkJoinPool with a size corresponding to Runtime.getRuntime().availableProcessors() which is enough for tasks with very few IO, if you have tasks with IO such that you would like to increase the size of the pool, simply provide the initial task to your custom ForkJoinPool, the parallel stream will then use your pool instead of the common pool.

ForkJoinPool forkJoinPool = new ForkJoinPool(20);
forkJoinPool.submit(() -> csvData.parallelStream().forEach(MyClass::ProcessRecord)).get();