Raul Guiu Raul Guiu - 5 months ago 11
Java Question

Mutable parameters in Java 8 Streams

Looking at this question: How to dynamically do filtering in Java 8?

The issue is to truncate a stream after a filter has been executed. I cant use limit because I dont know how long the list is after the filter. So, could we count the slements after the filter?

So, I thought I could create a class that counts and pass the stream through a map.The code is in this answer.

I created a class that counts but leave the elements unaltered, I use a Function here, to avoid to use the lambdas I used in the other answer:

class DoNothingButCount<T > implements Function<T, T> {
AtomicInteger i;
public DoNothingButCount() {
i = new AtomicInteger(0);
}
public T apply(T p) {
i.incrementAndGet();
return p;
}
}


So my Stream was finally:

persons.stream()
.filter(u -> u.size > 12)
.filter(u -> u.weitght > 12)
.map(counter)
.sorted((p1, p2) -> p1.age - p2.age)
.collect(Collectors.toList())
.stream()
.limit((int) (counter.i.intValue() * 0.5))
.sorted((p1, p2) -> p2.length - p1.length)
.limit((int) (counter.i.intValue() * 0.5 * 0.2)).forEach((p) -> System.out.println(p));


But my question is about another part of the my example.

collect(Collectors.toList()).stream().


If I remove that line the consequences are that the counter is ZERO when I try to execute limit. I am somehow cheating the "efectively final" requirement by using a mutable object.

I may be wrong, but I iunderstand that the stream is build first, so if we used mutable objects to pass parameters to any of the steps in the stream these will be taken when the stream is created.

My question is, if my assumption is right, why is this needed? The stream (if non parallel) could be pass sequentially through all the steps (filter, map..) so this limitation is not needed.

Answer

Short answer

My question is, if my assumption is right, why is this needed? The stream (if non parallel) could be pass sequentially through all the steps (filter, map..) so this limitation is not needed.

As you already know, for parallel streams, this sounds pretty obvious: this limitation is needed because otherwise the result would be non deterministic.

Regarding non-parallel streams, it is not possible because of their current design: each item is only visited once. If streams did work as you suggest, they would do each step on the whole collection before going to the next step, which would probably have an impact on performance, I think. I suspect that's why the language designers made that decision.


Why it technically does not work without collect

You already know that, but here is the explanation for other readers. From the docs:

Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.

Every intermediate operation of Stream, such as filter() or limit() is actually just some kind of setter that initializes the stream's options.

When you call a terminal operation, such as forEach(), collect() or count(), that's when the computation happens, processing items following the pipeline previously built.

This is why limit()'s argument is evaluated before a single item has gone through the first step of the stream. That's why you need to end the stream with a terminal operation, and start a new one with the limit() you'll then know.

More detailed answer about why not allow it for parallel streams

Let your stream pipeline be step X > step Y > step Z.

We want parallel treatment of our items. Therefore, if we allow step Y's behavior to depend on the items that already went through X, then Y is non deterministic. This is because at the moment an item arrives at step Y, the set of items that have already gone through X won't be the same across multiple executions (because of the threading).

More detailed answer about why not allow it for non-parallel streams

A stream, by definition, is used to process the items in a flow. You could think of a non-parallel stream as follows: one single item goes through all the steps, then the next one goes through all the steps, etc. In fact, the doc says it all:

The elements of a stream are only visited once during the life of a stream. Like an Iterator, a new stream must be generated to revisit the same elements of the source.

If streams didn't work like this, it wouldn't be any better than just do each step on the whole collection before going to the next step. That would actually allow mutable parameters in non-parallel streams, but it would probably have a performance impact (because we would iterate multiple times over the collection). Anyway, their current behavior does not allow what you want.