user1071840 user1071840 - 2 months ago 5
Java Question

Wait for daemon threads to complete an iteration using an executor service

I have to parallelize an existing background task such that instead of serially consuming 'x' resources, it parallely finishes the job at hand using only 'y' threads (y << x). This task constantly runs in the background and keeps processing some resources.

Code is structured as follows:

class BaseBackground implements Runnable {
@Override
public void run() {
int[] resources = findResources(...);

for (int resource : resources) {
processResource(resource);
}

stopProcessing();
}

public abstract void processResource(final int resource);
public void void stopProcessing() {
// Override by subclass as needed
}
}

class ChildBackground extends BaseBackground {

@Override
public abstract void processResource(final int resource) {
// does some work here
}

public void void stopProcessing() {
// reset some counts and emit metrics
}
}


I've modified the
ChildBackground
in the following manner:

class ChildBackground extends BaseBackground {

private final BlockingQueue<Integer> resourcesToBeProcessed;

public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}

@Override
public abstract void processResource(final int resource) {
resourcesToBeProcessed.add(resource);
}

public void void stopProcessing() {
// reset some counts and emit metrics
}

public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
// does some work
}
}
}
}


I am not creating and tearing down ExecutorService each time because garbage collection is bit of a problem in my service. Although, I do not understand how bad it'll be since I won't spawn more than 10 threads in every iteration.

I am not able to understand how do I wait for all the
ResourceProcessor
s to finish processing resources for one iteration so I can reset some counts and emit metrics in
stopProcessing
. I've considered the following options:

1) executorService.awaitTermination(timeout). This won't really work as it will always block until the timeout because the
ResourceProcessor
threads will never really finish their jobs

2) I can find out the number of resources after
findResources
and make it available to the child class and have each
ResourceProcessor
increment the number of resources processed. I will have to wait for all the resources to be processed in
stopProcessing
before resetting counts. I need something like CountDownLatch, but it should count
UP
instead. There'll be a lot of state management in this option, which I am not particularly fond of.

3) I could update the
public abstract void processResource(final int resource)
to include count of total resources and have the child process wait until all threads have processed total resources. There'll be some state management in this case also, but it'll be limited to the child class.

In either of the 2 cases, I will to have to add wait() & notify() logic, but I am not confident about my approach. This is what I've:

class ChildBackground extends BaseBackground {

private static final int UNSET_TOTAL_RESOURCES = -1;

private final BlockingQueue<Integer> resourcesToBeProcessed;

private int totalResources = UNSET_TOTAL_RESOURCES;
private final AtomicInteger resourcesProcessed = new AtomicInteger(0);

public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}

@Override
public abstract void processResource(final int resource, final int totalResources) {
if (this.totalResources == UNSET_TOTAL_RESOURCES) {
this.totalResources = totalResources;
} else {
Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
}
resourcesToBeProcessed.add(resource);
}

public void void stopProcessing() {
try {
waitForAllResourcesToBeProcessed();
} catch (InterruptedException e) {
e.printStackTrace();
}
resourcesProcessed.set(0);
totalResources = UNSET_TOTAL_RESOURCES;
// reset some counts and emit metrics
}

private void incrementProcessedResources() {
synchronized (resourcesProcessed) {
resourcesProcessed.getAndIncrement();
resourcesProcessed.notify();
}
}

private void waitForAllResourcesToBeProcessed() throws InterruptedException {
synchronized (resourcesProcessed) {
while (resourcesProcessed.get() != totalResources) {
resourcesProcessed.wait();
}
}
}

public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
try {
// does some work
} finally {
incrementProcessedResources();
}
}
}
}
}


I'm not sure if using
AtomicInteger
is the right way to do it, and if so, do I need to call wait() and notify(). If I am not using wait() and notify() I don't even have to execute everything in a synchronized block.

Please let me know your thoughts about this approach, if I should simply create and shutdown ExecutorService for every iteration or is there a fourth approach which I should pursue.

Answer

Your code seems to be unnecessarily complex. Why have your own queue when there is a queue already inside of the ExecutorService? You are having to do a whole bunch of administration when I think that you can let the stock ExecutorService handle it for you.

I'd define your jobs as:

public static class ResourceProcessor implements Runnable {
   private final int resource;
   public ResourceProcessor(int resource) {
      this.resource = resource;
   }
   public void run() {
      try {
         // does some work
      } finally {
         // if this is still necessary then you should use a `Future` instead
         incrementProcessedResources();
      }
   }
}

Then you could submit them like:

ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < totalResources; ++i) {
     executorService.submit(new ResourceProcessor(i));
}
// shutdown the thread pool after the last submit
executorService.shutdown();

executorService.awaitTermination(timeout). This won't really work as it will always block until the timeout because the ResourceProcessor threads will never really finish their jobs

This would now work.

2) I can find out the number of resources [have finished].

Do you still need this if you can call awaitTermination(...)?

3) I could update the public abstract void processResource(final int resource) to include count of total resources and have the child process wait until all threads have processed total resources...

Same question. Is this needed?

If you actually need to know the list of processed requests then you could, like @ScaryWombat mentioned use Future<Integer> and Callable<Integer> or use a ExecutorCompletionService.

Futures aren't an option because the executor threads run within a tight loop that stops only when the service is deactivated.

Can you explain this more?

Hope this helps.