Martin Boyanov Martin Boyanov - 5 months ago 20
Java Question

Java 8 stream combiner never called

I'm writing a custom java 8 collector which is supposed to compute the average of a POJO which has a

getValue()
method. Here's the code:

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {

@Override
public Supplier<BigDecimal[]> supplier() {
return () -> {
BigDecimal[] start = new BigDecimal[2];
start[0] = BigDecimal.ZERO;
start[1] = BigDecimal.ZERO;
return start;
};
}

@Override
public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
return (a,b) -> {
a[0] = a[0].add(b.getValue());
a[1] = a[1].add(BigDecimal.ONE);
};
}

@Override
public BinaryOperator<BigDecimal[]> combiner() {
return (a,b) -> {
a[0] = a[0].add(b[0]);
a[1] = a[1].add(b[1]);
return a;
};
}

@Override
public Function<BigDecimal[], BigDecimal> finisher() {
return (a) -> {
return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
};
}

private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));

@Override
public Set<Characteristics> characteristics() {
return CHARACTERISTICS;
}

};


It all works well in the non-parallel case. However, when I use a
parallelStream()
, it sometimes doesn't work. For example, given the values from 1 to 10, it computes( 53/9 instead of 55/10). When debugging the debugger never hits the breakpoint in the combiner() function. Is there some kind of flag that I need to set?

Answer

It looks like the problem is the CONCURRENT characteristic, which does something else than you would think it might:

Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

Instead of calling the combiner, the accumulator is being called concurrently, using the same BigDecimal[] a for all threads. The access to a is not atomic, so it goes wrong:

Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]

Making the value of a[0] 7 when it should be 10. The same kind of thing can happen with a[1], so results can be inconsistent.


If you remove the CONCURRENT characteristic, the combiner will get used instead.