Bast Bast - 3 years ago 270
Java Question

Java - ExecutorService has a maximum size

Is there any way to go through a huge database and apply some jobs in parallel for bench of entries?
I tried with ExecutorService, but we have to shutdown() in order to know the pool size...

So my best solution is:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest)
{
return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}

public static void main(String args[]) throws Exception
{
int dbOffset = 0;
int nbOfArticlesPerRequest = 100;
int MYTHREADS = 10;
int loopIndex = 0;
boolean bContinue=true;
Runnable worker;



while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...

List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}

executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}

if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
}



public static class MyRunnable implements Runnable {

private final String id;

MyRunnable(String id) {
this.id = id;
}

@Override
public void run()
{
System.out.println("Thread '"+id+"' started");
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread '"+id+"' stopped");
}
}
}


This is working fine, but the drawback is that at every end of loop I need to wait the last threads to finish.

e.g.: when only 3 threads are running...

I did the following in order to solve this problem, but is that "safe"/correct?

BTW: is there any way to know how many tasks/threads are in the queue?

int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;

ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;

List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}

while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}

if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}

executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}


EDIT:

I try to launch 1 or 10 millions tasks, so (I assume) I cannot put them all in the queue... That's why I use a global executor in order to be able to always have some threads in queue (for that I cannot shutdown the executor, otherwise it's not usable anymore).

Latest code version:

int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;

ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;

List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executorPool.execute(worker);
}

while (executorPool.getActiveCount() >= MYTHREADS || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2))
{
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);

if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
break;
}

TimeUnit.MILLISECONDS.sleep(2000);
}

if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}

executorPool.shutdown();
// Wait until all threads are finish
while (!executorPool.isTerminated()) {
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}


Thanks in advance

Answer Source

Update

Now it's clear to me that your main concern it that you can't submit 10 million tasks at once.

Do not be afraid, you can submit all of them into executor. The actual amount of tasks run in parallel is limited by the underlying thread pool size. That is, if you have pool size of 2, only two tasks are being executed at the time, the rest sit in the queue and wait for the free thread.

By default Executors.newFixedThreadPool() creates the Executor that has a queue of Integer.MAX_VALUE size, hence your millions of tasks would fit there.


You can use ExecutorService.submit() method that returns Future. Then you can examine state of your Future tasks (i.e. with isDone(), isCancelled() methods).

Executor is typically something you don't want to shutdown explicitly and exists throughout your application lifecycle. With this approach you don't need to shutdown in order to know how many tasks are pending.

List<Future<?>> tasks = new ArrayList<>();
for (String id : ids) {
    Future<?> task = executorService.submit(() -> {
        // do work
    });
    tasks.add(task);
}

long pending = tasks.stream().filter(future -> !future.isDone()).count();
System.out.println(pending + " task are still pending");

Moreover, please note that tasks and threads are not interchangeable terms. In your case, the executor has fixed number of threads. You can submit more tasks than that, but the rest will sit in executor's queue until there's a free thread to run the task on.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download