Semafoor Semafoor - 6 months ago 29
Java Question

Limiting infinite parallel stream

1) How can I use a Supplier (

supplier
) to create a sized stream of N values in parallel, while ensuring that no more than N calls are made to the supplier? I need this because I have a supplier with a costly
supplier.get()
operation.

2) The 'obvious' answer to my question,
Streams.generate(supplier).limit(N)
, does not work and often results in more than N calls being made to the supplier. Why is this?

As 'proof' of the fact that
Streams.generate(supplier).limit(N)
results in more than N calls to
supplier.get()
, consider the following code:

public class MWE {
static final int N_ELEMENTS=100000;
static Supplier<IntSupplier> mySupplier = () -> new IntSupplier() {
AtomicInteger ai = new AtomicInteger(-1);
@Override
public int getAsInt() {
return ai.incrementAndGet();
}
};
public static void main(String[] args) {
int[] a = IntStream.generate(mySupplier.get()).limit(N_ELEMENTS).toArray();
int[] b = IntStream.generate(mySupplier.get()).parallel().limit(N_ELEMENTS).toArray();
}
}


a
is equal to
[0, 1, ..., N_ELEMENTS-1]
as expected, but contrary to what you might expect
b
does not contain the same elements as
a
. Instead,
b
often contains elements that are greater than or equal to
N_ELEMENTS
, which indicates more than
N_ELEMENTS
number of calls to the supplier (if it does not on your computer, try increasing the value of
N_ELEMENTS
).

Answer

Calling .limit() is not guaranteed to result in a stream of the first N elements generated by the supplier because Stream.generate() creates an unordered stream, which leaves limit() free to decide on what 'part' of the stream to keep. Actually, it is not even semantically sound to refer to "the first N elements" or "(the first) part of the stream", because the stream is unordered. This behavior is clearly laid out in the API documentation; many thanks to everyone who pointed this out to me!

Since asking this question, I have come up with two solutions to my own question. My thanks go to Tagir who set me off in the right direction.

Solution 1: Misusing IntStream.range()

A simple and fairly efficient way of creating an unordered, sized, parallel stream backed by a supplier that makes no more calls to the supplier than is absolutely necessary is to (mis)use IntStream.range() like this:

IntStream.range(0,N_ELEMENTS).parallel().mapToObj($ -> generator.get())

Basically, we are using IntStream.range() only to create a sized stream that can be processed in parallel.

Solution 2: Custom spliterator

Because we never actually use the integers inside of the stream created by IntStream.range(), it seems like we can do slightly better by creating a custom Spliterator:

final class SizedSuppliedSpliterator<T> implements Spliterator<T> {
    private int remaining;

    private final Supplier<T> supplier;

    private SizedSuppliedSpliterator(Supplier<T> supplier, int remaining) {
        this.remaining = remaining;
        this.supplier = supplier;
    }

    static <T> SizedSuppliedSpliterator of(Supplier<T> supplier, int limit) {
        return new SizedSuppliedSpliterator(supplier, limit);
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> consumer) {
        Objects.requireNonNull(consumer);
        if (remaining > 0) {
            remaining--;
            final T supplied = supplier.get();
            consumer.accept(supplied);
            return true;
        }
        return false;
    }

    @Override
    public void forEachRemaining(final Consumer<? super T> consumer) {
        while (remaining > 0) {
            consumer.accept(supplier.get());
            remaining--;
        }
    }

    @Override
    public SizedSuppliedSpliterator<T> trySplit() {
        int split = (int)remaining/2;
        remaining -= split;
        return new SizedSuppliedSpliterator<>(supplier, split);
    }

    @Override
    public long estimateSize() {
        return remaining;
    }

    @Override
    public int characteristics() {
        return SIZED | SUBSIZED | IMMUTABLE;
    }
}

We can use this spliterator to create the stream as follows:

StreamSupport.stream(SizedSuppliedSpliterator.of(supplier, N_ELEMENTS), true)

Of course, computing a couple of integers is hardly expensive, and I have not been able to notice or even measure any improvement in performance over solution 1.