vibhas vibhas - 2 months ago 13
Java Question

Multiple ExecutorService finish after main

I need some input from you regarding a scenario for which I am doing some POC(Proof of Concept).
I am novice to multithreading in java and trying to some test. My requirement is I want to load millions of records using simple
java and then after doing some conversion with the data will insert in a database table.
For that I want to perform a simple test related to finishing of all task.

Currently I want to try that my main method finishes only after my executor services finishes. Below is the code which I have tried.
Can anyone please help me to know if thats the correct way to finish the main method after finishing the executor threads.

Your suggestion will be highly appreciated.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleThreadPool {

public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
executor = Executors.newFixedThreadPool(5);
// Runnable worker = new WorkerThread("Thread executor :" + i);
executor.execute(new WorkerThread("Thread executor :" + i));
}
executor.shutdown();
while (!executor.isTerminated()) {
//System.out.println("Waiting");
}

System.out.println("Will start for Executor 1");

ExecutorService executor1 = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
// Runnable worker = new WorkerThread("Thread executor1 :" + i);
executor1.execute(new WorkerThread("Thread executor1 :" + i));
}
executor1.shutdown();
while (!executor1.isTerminated()) {
//System.out.println("Waiting");
}

System.out.println("Finished all threads");
//
String s=null;
s.toUpperCase();
}
}

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class WorkerThread implements Runnable {

private String command;

public WorkerThread(String s){
this.command=s;
}


public void run() {
ExecutorService executor2 = Executors.newFixedThreadPool(5);

Future loadData=executor2.submit(new LoadData());

System.out.println(" Start. Command = "+command);

try {
List listOfData=(List) loadData.get();

for(int i=0;i<listOfData.size();i++){
//Thread.sleep(500);
//System.out.println("Printing the value from list:"+listOfData.get(i));

ConversionLogic conversion= new ConversionLogic();
conversion.doConversion(command);
}



} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

System.out.println(" End."+command);

}



public String toString(){
return this.command;
}
}

class LoadData implements Callable{

public List call() throws Exception {

List<String> dataList= new ArrayList<String>();

for(int i=0;i<100;i++){
String data="Data_"+i;
//System.out.println("Data Added in List:"+data);
dataList.add(data);
}
Thread.sleep(10000);

return dataList;
}

}



public class ConversionLogic {

public void doConversion(String threadName){

try {

System.out.println("Converting Data for Thread starts:"+threadName);
Thread.sleep(5000);
System.out.println("Converting Data for Thread ends:"+threadName);


} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}





SO this is what I have understood from the answers provided below:

package stackoverflow.test;

import java.util.List;
import java.util.concurrent.*;

class SimpleThreadPool {

public static void main(String[] args) throws InterruptedException, ExecutionException {

ExecutorService executor = Executors.newFixedThreadPool(10);
ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(executor);

ExecutorService executor2 = Executors.newFixedThreadPool(10);
ExecutorCompletionService<List> processor2 = new ExecutorCompletionService<List>(executor2);


//start loading data
int procCount = 0;
for (int i = 0; i < 10; i++) {
Future loadData = processor.submit(new LoadData("Org"));
procCount++;
}

//now all loading tasks have been submitted and are being executed
System.out.println("All LOADING tasks have been submitted and are being executed");


//new work queue using the same executor (or another one if you want more parallelism)
ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(executor);

while (procCount-- > 0) {
Future<List> listOfDataFuture = processor.take(); //blocks until a 'LoadData' finishes
System.out.println("A loading task just completed");
List listOfData = listOfDataFuture.get();
for (int i = 0; i < listOfData.size(); i++) {
ConversionLogic conversion = new ConversionLogic(procCount + "_" + i);
converter.submit(conversion);
}
}
System.out.println("All LOADING tasks have been COMPLETED for Org");

//now all conversion tasks have been submitted and are being executed
System.out.println("All CONVERSION task have been submitted and are being executed for Org:");

/* You don't need to wait for conversion tasks to complete:
* they will be completed nonetheless. Wait for them (with take())
* if you need the results.
* */
executor.shutdown();
try {
System.out.println("Waiting for finish");
executor.awaitTermination(1000, TimeUnit.SECONDS);
System.out.println("Stopped nicely");
} catch (InterruptedException e) {
System.out.println("Could not stop in alloted time");
}

System.out.println("Fund Data Loading Starts:");
//___________________________________________________________________//

// Some argument to get Fund Data
int procCount1 = 0;
for (int i = 0; i < 5; i++) {
Future loadData = processor2.submit(new LoadData("Fund"));
procCount1++;
}

//now all loading tasks have been submitted and are being executed
System.out.println("All LOADING tasks have been submitted and are being executed for Fund:");


//new work queue using the same executor (or another one if you want more parallelism)
ExecutorCompletionService<Void> converter1 = new ExecutorCompletionService<Void>(executor2);

while (procCount1-- > 0) {
Future<List> listOfDataFuture = processor2.take(); //blocks until a 'LoadData' finishes
System.out.println("A loading task just completed for Fund:");
List listOfData = listOfDataFuture.get();
for (int i = 0; i < listOfData.size(); i++) {
ConversionLogic conversion = new ConversionLogic(procCount + "_" + i);
converter1.submit(conversion);
}
}
System.out.println("All LOADING tasks have been COMPLETED for Org");

//now all conversion tasks have been submitted and are being executed
System.out.println("All CONVERSION task have been submitted and are being executed for Org:");

/* You don't need to wait for conversion tasks to complete:
* they will be completed nonetheless. Wait for them (with take())
* if you need the results.
* */
executor2.shutdown();
try {
System.out.println("Waiting for finish");
executor.awaitTermination(1000, TimeUnit.SECONDS);
System.out.println("Stopped nicely");
} catch (InterruptedException e) {
System.out.println("Could not stop in alloted time");
}


System.out.println("<<<<<<<<<< End>>>>>>>>");
System.exit(0);



}
}


package stackoverflow.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

class LoadData implements Callable {
String dataType;

public List call() throws Exception {

List<String> dataList = new ArrayList<String>();

for (int i = 0; i < 20; i++) {
String data = "Data_" + i;
System.out.println("Processing Data of Type :" + dataType + "Data is:"+data);
dataList.add(data);
}
Thread.sleep(2000);

return dataList;
}

LoadData(String type){
this.dataType=type;
}

}

package stackoverflow.test;

import java.util.concurrent.Callable;

class ConversionLogic implements Callable {

private String threadName;

public ConversionLogic(String threadName) {

this.threadName = threadName;
}

public Void call() throws Exception {
try {

System.out.println("Converting Data for Thread starts:" + threadName);
Thread.sleep(1000);
System.out.println("Converting Data for Thread ends:" + threadName);


} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return null;
}
}





Looking to do something like fetching data for a set of records and perform conversion.But ended up having the following error.

org.postgresql.util.PSQLException: ERROR: operator does not exist: boolean > integer
Hint: No operator matches the given name and argument type(s). You might need to add explicit type casts.
Position: 57
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2102)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1835)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:257)
at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:500)
at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:374)
at org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:254)
at stackoverflow.tesst.LoadData.call(LoadData.java:23)
at stackoverflow.tesst.LoadData.call(LoadData.java:1)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)


package stackoverflow.tesst;

import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.*;

import connection.java.JdbcConnection;

class SimpleThreadPool {

public static void main(String[] args) throws InterruptedException,
ExecutionException {

ExecutorService executor = Executors.newFixedThreadPool(10);
ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(
executor);

ExecutorService executor2 = Executors.newFixedThreadPool(10);
ExecutorCompletionService<List> processor2 = new ExecutorCompletionService<List>(
executor2);

System.out.println("Connecting to DB...");
try {
System.out.println("Connection is :"
+ JdbcConnection.getConnection());
} catch (ClassNotFoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (SQLException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}

// start loading data
int procCount = 0;
int priceRange1 = 200;
int priceRange2 = priceRange1 + 200;
ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(
executor);

for (int i = 0; i < 10; i++) {

String query = "select code,name,price from Product where price ";

if (i == 0) {
String finalQuery = query + " <= " + priceRange1;
Future loadData = processor.submit(new LoadData("Org",
finalQuery));

Future<List> listOfDataFuture = processor.take();

System.out.println("A loading task just completed for thread : Org::"+i);
List listOfData = listOfDataFuture.get();
for (int j = 0; j < listOfData.size(); j++) {
ConversionLogic conversion = new ConversionLogic(procCount
+ "_" + j,listOfData);
converter.submit(conversion);
}

} else {

String finalQuery = query + " <= " + priceRange2 + " > "
+ priceRange1;
Future loadData = processor.submit(new LoadData("Org",
finalQuery));

Future<List> listOfDataFuture = processor.take();

System.out.println("A loading task just completed for thread : Org::"+i);
List listOfData = listOfDataFuture.get();
for (int j = 0; j < listOfData.size(); j++) {
ConversionLogic conversion = new ConversionLogic(procCount
+ "_" + j,listOfData);
converter.submit(conversion);
}
}
priceRange1 = priceRange2;
priceRange2 = priceRange2 + 200;

procCount++;
}

executor.shutdown();
try {
System.out.println("Waiting for finish");
executor.awaitTermination(1000, TimeUnit.SECONDS);
System.out.println("Stopped nicely");
} catch (InterruptedException e) {
System.out.println("Could not stop in alloted time");
}
System.out.println("<<<<<<<<<< End>>>>>>>>");
System.exit(0);

package stackoverflow.tesst;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

----------------------------------------------
import connection.java.JdbcConnection;

class LoadData implements Callable {
String dataType;
Connection conn;
String query;

public List call() throws Exception {
List<Product> dataList = new ArrayList<Product>();
try {
conn=JdbcConnection.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(this.query);
while(rs.next()){
System.out.println(rs.getString("code"));
System.out.println(rs.getString("name"));
System.out.println(rs.getInt("price"));
Product p= new Product(rs.getString("code"),rs.getString("name"),rs.getInt("price"));
dataList.add(p);
}
rs.close();//conn.close();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

Thread.sleep(2000);

return dataList;
}

LoadData(String type,String query){
this.dataType=type;
this.query=query;
}

}


}
}

---------------------------
package stackoverflow.tesst;

import java.util.List;
import java.util.concurrent.Callable;

class ConversionLogic implements Callable {

private String threadName;
List<Product> productList;

public ConversionLogic(String threadName,List<Product> productList) {

this.threadName = threadName;
this.productList=productList;
}

public Void call() throws Exception {
try {

System.out.println("Converting Data for Thread starts:" + threadName);
Thread.sleep(1000);
int listSize=productList.size();
for(int i=0;i<listSize;i++){
//Do conversion for product let say
System.out.println("Print product in Conversion:"+productList.get(i).getPrice());
}
System.out.println("Converting Data for Thread ends:" + threadName);


} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return null;
}
}
------------------------------------
package connection.java;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public class JdbcConnection {

static Connection conn;
static String user;
static String pass;
static String dbURL;

public static Connection getConnection() throws ClassNotFoundException,
SQLException {
Class.forName("org.postgresql.Driver");

dbURL = "jdbc:postgresql://localhost:5433:postgres";
user = "postgres";
pass = "root";
Connection conn = DriverManager.getConnection(dbURL, user, pass);
Statement stmt = conn.createStatement();
System.out.println("Created DB Connection....");
return conn;

}

}

Answer

There are way to many thread pools in your code and, in general, spawning new threads inside a spawned thread is not a good idea: it can easily get out of control. In your case, you don't need the WorkerThread: you already have the thread pool provided by the Java framework (ExecutorService).

Since you need the result from a thread (LoadData) to be processed (ConversionLogic) I would use also an ExecutorCompletionService to help with gathering results from LoadData.

Following the refactored code. I've ditched the WorkerThread and used only a threadpool (altough you can use two if you want more parallelism), also the ConversionLogic now implements Callable so that it can be easily processed by the thread pool.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

class SimpleThreadPool {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ExecutorCompletionService<List> processor = new ExecutorCompletionService<List>(executor);


        //start loading data
        int procCount = 0;
        for (int i = 0; i < 10; i++) {
            Future loadData = processor.submit(new LoadData());
            procCount++;
        }
        //now all loading tasks have been submitted and are being executed
        System.out.println("All LOADING tasks have been submitted and are being executed");


        //new work queue using the same executor (or another one if you want more parallelism)
        ExecutorCompletionService<Void> converter = new ExecutorCompletionService<Void>(executor);

        while (procCount-- > 0) {
            Future<List> listOfDataFuture = processor.take(); //blocks until a 'LoadData' finishes
            System.out.println("A loading task just completed");
            List listOfData = listOfDataFuture.get();
            for (int i = 0; i < listOfData.size(); i++) {
                ConversionLogic conversion = new ConversionLogic(procCount + "_" + i);
                converter.submit(conversion);
            }
        }
        System.out.println("All LOADING tasks have been COMPLETED");

        //now all conversion tasks have been submitted and are being executed
        System.out.println("All CONVERSION task have been submitted and are being executed");

        /* You don't need to wait for conversion tasks to complete:
          * they will be completed nonetheless. Wait for them (with take())
          * if you need the results.
         * */    
        executor.shutdown();
        System.out.println(" End.");


    }
}

class LoadData implements Callable {

    public List call() throws Exception {

        List<String> dataList = new ArrayList<String>();

        for (int i = 0; i < 20; i++) {
            String data = "Data_" + i;
            System.out.println("Data Added in List:" + data);
            dataList.add(data);
        }
        Thread.sleep(2000);

        return dataList;
    }

}


class ConversionLogic implements Callable {

    private String threadName;

    public ConversionLogic(String threadName) {

        this.threadName = threadName;
    }

    @Override
    public Void call() throws Exception {
        try {

            System.out.println("Converting Data for Thread starts:" + threadName);
            Thread.sleep(1000);
            System.out.println("Converting Data for Thread ends:" + threadName);


        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return null;
    }
}
Comments