spl spl - 4 months ago 35
Java Question

Can Java 8 streams cause a O(1) memory reduction on unbounded data to become O(n) memory because of the underlying ForkJoin implementation

I've written a streams implementation that performs four simple reductions (+ and <) on the lines of a file.

At first I performed four streams, but I decided to write my own accumulator and combiner so that I could perform all four reductions in one stream. On small data sets (10,000,000 lines) this reduces runtime to about 1/4 as expected, and runs in 14 seconds on my hardware.

fileIn = new BufferedReader(new InputStreamReader(
new URL(args[0].trim()).openStream()));

final Results results = fileIn.lines()
.collect(Results::new, Results::accumulate, Results::combine);

correctly combine Users into Results and Results with Results respectively, and this implementation works
great for small data sets.

I tried using
as well, and results are similar, but I tried
to reduce the creation of short-lived objects.

The problem is that when I use real-world sized data with 1 billion lines I am hitting an issue that suggests that Java 8 streams are incapable of the task. The heap memory is observed in JConsole to climb to the allocated 12 GB in a roughly linear fashion and then OOM.

I was under the impression that the collector or reducer would provide performance comparable to an iterative solution, which should be bounded by CPU and IO but not memory, because the reduction step produces a Result that doesn't grow, it's a reduction!

When I take a heap dump and put it into jhat I see that about 7GB is taken up with Strings, and these strings can clearly be seen to be the lines of the input file. I feel they should not be in memory at all, but jhat shows a very large ForkJoin related structure being accumulated in memory:

Static reference from java.util.concurrent.ForkJoinPool.common (from class java.util.concurrent.ForkJoinPool) :

--> java.util.concurrent.ForkJoinPool@0x786d41db0 (76 bytes) (field workQueues:)
--> [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598 (144 bytes) (Element 3 of [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598:)
--> java.util.concurrent.ForkJoinPool$WorkQueue@0x786d41ee8 (96 bytes) (field currentSteal:)
--> java.util.stream.SliceOps$SliceTask@0x7b4ac6cb0 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b379ad18 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b25bdb68 (130 bytes) (field leftChild:)
--> java.util.stream.SliceOps$SliceTask@0x7b379acb8 (130 bytes) (field localResult:)
--> java.util.stream.Nodes$SpinedNodeBuilder@0x7b25fdda0 (53 bytes) (field spine:)
--> [[Ljava.lang.Object;@0x7b25ffe48 (144 bytes) (Element 12 of [[Ljava.lang.Object;@0x7b25ffe48:)
--> [Ljava.lang.Object;@0x7b37c4f20 (262160 bytes) (Element 19598 of [Ljava.lang.Object;@0x7b37c4f20:)
--> 31ea87ba876505645342b31928394b3c,2013-11-24T23:02:17+00:00,898,22200,1314,700 (28 bytes) (field value:)
--> [C@0x7b2ffff88 (170 bytes) // <<<< There are thousands of these

There are other references in ApplicationShutdownHooks, Local references, and System Classes, but this one I show is the crux of the problem, and it causes the memory to grow O(n) when

Does the streams implementation make this O(1) memory problem O(n) memory by holding all the Strings in the ForkJoin classes?? I love streams and I don't want this to be so :(

spl spl

Thanks to Marko Topolnik and Holger for coming to the correct answer. Though neither posted an answer for me to accept, so I'll try to tie this up for posterity :)

The .skip(1) is very expensive on a parallel stream because it requires ordering to skip exactly the first entry, as per the Javadoc for Stream.skip()

Reading the first line of the BufferedReader before calling .lines() on it does successfully skip the first line in my implementation.

Then removing the .skip() solves the memory problem, and it is observed in JConsole to bounce around nicely and return to < 1GB on every garbage collection even if the program processes 1 billion lines. This is desired behaviour and is close enough to O(1) memory for my purposes.

Contrary to a suggestion above, the relative locations of .parallel() and .skip(1) do not matter, you cannot re-order them to make the .skip(1) happen "before" the .parallel(). The builder pattern suggests that ordering is important, and it is for other intermediate operations, but not for this one. I remember this subtlety from my OCP certification materials, but it doesn't appear to be in the Javadoc, hence no reference. I have, however, confirmed this experimentally by making the isolated change and observing the regression in JConsole, and associated OOM.