I've written a streams implementation that performs four simple reductions (+ and <) on the lines of a file.
At first I performed four streams, but I decided to write my own accumulator and combiner so that I could perform all four reductions in one stream. On small data sets (10,000,000 lines) this reduces runtime to about 1/4 as expected, and runs in 14 seconds on my hardware.
fileIn = new BufferedReader(new InputStreamReader(
final Results results = fileIn.lines()
.collect(Results::new, Results::accumulate, Results::combine);
Static reference from java.util.concurrent.ForkJoinPool.common (from class java.util.concurrent.ForkJoinPool) :
--> java.util.concurrent.ForkJoinPool@0x786d41db0 (76 bytes) (field workQueues:)
--> [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598 (144 bytes) (Element 3 of [Ljava.util.concurrent.ForkJoinPool$WorkQueue;@0x786eda598:)
--> java.util.concurrent.ForkJoinPool$WorkQueue@0x786d41ee8 (96 bytes) (field currentSteal:)
--> java.util.stream.SliceOps$SliceTask@0x7b4ac6cb0 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b379ad18 (130 bytes) (field completer:)
--> java.util.stream.SliceOps$SliceTask@0x7b25bdb68 (130 bytes) (field leftChild:)
--> java.util.stream.SliceOps$SliceTask@0x7b379acb8 (130 bytes) (field localResult:)
--> java.util.stream.Nodes$SpinedNodeBuilder@0x7b25fdda0 (53 bytes) (field spine:)
--> [[Ljava.lang.Object;@0x7b25ffe48 (144 bytes) (Element 12 of [[Ljava.lang.Object;@0x7b25ffe48:)
--> [Ljava.lang.Object;@0x7b37c4f20 (262160 bytes) (Element 19598 of [Ljava.lang.Object;@0x7b37c4f20:)
--> 31ea87ba876505645342b31928394b3c,2013-11-24T23:02:17+00:00,898,22200,1314,700 (28 bytes) (field value:)
--> [C@0x7b2ffff88 (170 bytes) // <<<< There are thousands of these
Thanks to Marko Topolnik and Holger for coming to the correct answer. Though neither posted an answer for me to accept, so I'll try to tie this up for posterity :)
.skip(1) is very expensive on a parallel stream because it requires ordering to skip exactly the first entry, as per the Javadoc for Stream.skip()
Reading the first line of the BufferedReader before calling
.lines() on it does successfully skip the first line in my implementation.
Then removing the
.skip() solves the memory problem, and it is observed in JConsole to bounce around nicely and return to < 1GB on every garbage collection even if the program processes 1 billion lines. This is desired behaviour and is close enough to O(1) memory for my purposes.
Contrary to a suggestion above, the relative locations of
.skip(1) do not matter, you cannot re-order them to make the
.skip(1) happen "before" the
.parallel(). The builder pattern suggests that ordering is important, and it is for other intermediate operations, but not for this one. I remember this subtlety from my OCP certification materials, but it doesn't appear to be in the Javadoc, hence no reference. I have, however, confirmed this experimentally by making the isolated change and observing the regression in JConsole, and associated OOM.