Tofig Hasanov Tofig Hasanov - 3 months ago 13
Java Question

Java parallelstream not using optimal number of threads when using newCachedThreadPool()

I have made two separate implementations of parallel reads from database.
First implementation is using

ExecutorService
with
newCachedThreadPool()
constructor and Futures: I simply make a call that returns a future for each read case and then after I make all the calls I call
get()
on them. This implementation works OK and is fast enough.

The second implementation is using parallel streams. When I put parallel stream call into the same
ExecutorService
pool it works almost 5 times slower and it seems that it is not using as many threads as I would hope. When I instead put it into
ForkJoinPool pool = new ForkJoinPool(50)
then it works as fast as the previous implementation.

My question is:

Why do parallel streams under-utilize threads in
newCachedThreadPool
version?

Here is the code for the second implementation (I am not posting the first implementation, cause that one works OK anyway):

private static final ExecutorService pool = Executors.newCachedThreadPool();

final List<AbstractMap.SimpleImmutableEntry<String, String>> simpleImmutableEntryStream =
personIdList.stream().flatMap(
personId -> movieIdList.stream().map(
movieId -> new AbstractMap.SimpleImmutableEntry<>(personId, movieId))).collect(Collectors.toList());

final Future<Map<String, List<Summary>>> futureMovieSummaryForPerson = pool.submit(() -> {
final Stream<Summary> summaryStream = simpleImmutableEntryStream.parallelStream().map(
inputPair -> {
return FeedbackDao.find(inputPair.getKey(), inputPair.getValue());
}).filter(Objects::nonNull);
return summaryStream.collect(Collectors.groupingBy(Summary::getPersonId));
});

Answer

This is related to how ForkJoinTask.fork is implemented, if the current thread comes from a ForkJoinPool it will use the same pool to submit the new tasks but if not it will use the common pool with the total amount of processors in your local machine and here when you create your pool with Executors.newCachedThreadPool(), the thread created by this pool is not recognized as coming from a ForkJoinPool such that it uses the common pool.

Here is how it is implemented, it should help you to better understand:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

The thread created by the pool Executors.newCachedThreadPool() will not be of type ForkJoinWorkerThread such that it will use the common pool with an under optimized pool size to submit the new tasks.

Comments